顺德公农庄 发表于 2015-11-27 16:46:27

Spark + Flume

  转自:http://blog.csdn.net/u010398018/article/details/36634059
  
SparkStreaming 提供一个处理Flume流的事件,

viewplaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg
[*]class FlumeUtils
[*]
[*]def createStream(ssc: StreamingContext, hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream
[*]
[*]Create a input stream from a Flume source.

SparkStreaming的代码

viewplaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg
[*]object FlumeEventPrint {
[*]def main(args: Array) {
[*]    /*
[*]    if (args.length < 2) {
[*]      System.err.println(
[*]      &quot;Usage: FlumeEventCount <host> <port>&quot;)
[*]      System.exit(1)
[*]    }
[*]    */
[*]
[*]
[*]    //val Array(host, port) = args
[*]
[*]    val batchInterval = Milliseconds(2000)
[*]
[*]    // Create the context and set the batch size
[*]    val sparkConf = new SparkConf().setAppName(&quot;FlumeEventCount&quot;).setMaster(&quot;local&quot;)
[*]    val ssc = new StreamingContext(sparkConf, batchInterval)
[*]
[*]    // Create a flume stream
[*]    val stream = FlumeUtils.createStream(ssc, &quot;localhost&quot;, 33333, StorageLevel.MEMORY_AND_DISK)
[*]    //stream.saveAsTextFiles(&quot;nothing&quot;)
[*]    // Print out the count of events received from this server in each batch
[*]    //stream.map{cnt => &quot;Received &quot; &#43; cnt mkString &quot; &quot; &#43; &quot; flume events.&quot; }.saveAsTextFiles(&quot;nothing/&quot;)
[*]    stream.map(e=>e.event)saveAsTextFiles(&quot;nothing/&quot;)
[*]
[*]    ssc.start()
[*]    ssc.awaitTermination(10000)
[*]    ssc.stop(true,true)
[*]}
[*]}


这段代码是将从Flume Sink发送的事件打印到文本中,flume sink会根据配置的host与port将事件发送到spark



然后需要一个Flume的Agent来将数据接收,暂存及转发,下面是flume Agent的配置文件

a1.channels = c1
a1.sinks = k1
a1.sources = r1

a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 33333

a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100



对于我来说,Flume的框架不是很容易理解,而且容易产生误解.(之前我认为sink是用来处理数据的,后来发现sink也只是用来转发数据事件的,最终处理事件的是另外的外部程序,严格来说,Flume只是将数据封装与传送,不会对内容有任何的业务处理)


http://blog.csdn.net/u010398018/article/details/36634059


通过我的配置文件。
Source:配置成一个avro的服务器(接收来自avro客户端的事件), host为local host, port 为44444
Channel:通过内存来暂存事件
Sink:配置成一个avro的客户端(向host为localhost,端口为33333的avro服务器发送事件), 这里avro的服务器即是spark streaming的程序.


启动的顺序,先启动spark streaming的程序,然后程序Flume agent, 最后再额外启动一个avro客户端向Flume agent的Source发送数据。


命令:
启动flume agent: bin/flume-ng agent --conf conf --conf-file conf/agent1.conf --name a1 -Dflume.root.logger=INFO,console



启动avro client: bin/flume-ng avro-client --conf conf -H localhost -p 44444 -F /Users/heruilong/Documents/subway/gxy_9.xml -Dflume.root.logger=DEBUG,console(flume内部提供了一个avro client的实现)


数据结果:(我传送的是一个xml文件)
打印出来的是json格式
{&quot;headers&quot;: {}, &quot;body&quot;: {&quot;bytes&quot;: &quot;<?xml version=&quot;1.0&quot; encoding=&quot;GBK&quot;?>&quot;}}
{&quot;headers&quot;: {}, &quot;body&quot;: {&quot;bytes&quot;: &quot;&quot;}}
{&quot;headers&quot;: {}, &quot;body&quot;: {&quot;bytes&quot;: &quot;<subwaycard>&quot;}}
{&quot;headers&quot;: {}, &quot;body&quot;: {&quot;bytes&quot;: &quot;<card ID=&quot;10007510033252741&quot;>&quot;}}
{&quot;headers&quot;: {}, &quot;body&quot;: {&quot;bytes&quot;: &quot;    <recha>&quot;}}
{&quot;headers&quot;: {}, &quot;body&quot;: {&quot;bytes&quot;: &quot;      <recharge/>&quot;}}
{&quot;headers&quot;: {}, &quot;body&quot;: {&quot;bytes&quot;: &quot;    </recha>&quot;}}
{&quot;headers&quot;: {}, &quot;body&quot;: {&quot;bytes&quot;: &quot;    <cons>&quot;}}

需要参考的spark接口

viewplaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg
[*]class SparkFlumeEvent extends Externalizable   
[*]
[*]var event: AvroFlumeEvent
[*]def readExternal(in: ObjectInput): Unit
页: [1]
查看完整版本: Spark + Flume