candy 发表于 2015-8-1 12:39:47

Data Collection with Apache Flume(二)

        今天继续讨论几个agent的配置。
        第一个agent是从终端捕获特定命令执行的输出结果,并将文件输出到特定目录。先看一下配置的代码:      



agent2.sources = execsource      //指定为从命令获取输出的source
agent2.sinks = filesink          //输出到文件的sink
agent2.channels = filechannel   //输出到文件的channel
agent2.sources.execsource.type = exec//类型
agent2.sources.execsource.command = cat /home/leung/message   //指定命令
agent2.sinks.filesink.type = FILE_ROLL
agent2.sinks.filesink.sink.directory = /home/leung/flume/files   //输出目录
agent2.sinks.filesink.sink.rollInterval = 0
agent2.channels.filechannel.type = file
agent2.channels.filechannel.checkpointDir = /home/leung/flume/fc/checkpoint //检查点
agent2.channels.filechannel.dataDirs = /home/leung/flume/fc/data//channel的数据目录
agent2.sources.execsource.channels = filechannel
agent2.sinks.filesink.channel = filechannel

     OK,启动agent2,然后查看结果。
        
        结果如下图。可以看到,执行 cat /home/leung/message命令之后,输出的结果与files目录中的文件内容是一致的,证明已经成功写入文件。
        
        下一个agent是从网络端口 获取数据然后写到Hadoop集群的HDFS中。先看看配置代码:



agent4.sources = netsource
agent4.sinks = hdfssink //HDFS sink
agent4.channels = memorychannel
agent4.sources.netsource.type = netcat
agent4.sources.netsource.bind = localhost
agent4.sources.netsource.port = 3000

agent4.sinks.hdfssink.type = hdfs
agent4.sinks.hdfssink.hdfs.path = /flume//写出到HDFS上的文件目录,不需要提前创建
agent4.sinks.hdfssink.hdfs.filePrefix = log//指定写出文件的文件名前缀
agent4.sinks.hdfssink.hdfs.rollInterval = 0
agent4.sinks.hdfssink.hdfs.rollCount = 3
agent4.sinks.hdfssink.hdfs.fileType = DataStream
agent4.channels.memorychannel.type = memory
agent4.channels.memorychannel.capacity = 1000
agent4.channels.memorychannel.transactionCapacity = 100
agent4.sources.netsource.channels = memorychannel
agent4.sinks.hdfssink.channel = memorychannel

      下面启动agent4以及查看一下结果。
        
         下面查看一下结果。发现在HDFS中已经新建了一个flume文件夹,并且已经写入了指定的内容。
        
        接着我们为文件夹名加一个时间戳。详细看如下配置代码。



agent5.sources = netsource
agent5.sinks = hdfssink
agent5.channels = memorychannel
agent5.sources.netsource.type = netcat
agent5.sources.netsource.bind = localhost
agent5.sources.netsource.port = 3000
agent5.sources.netsource.interceptors = ts
agent5.sources.netsource.interceptors.ts.type = org.apache.flume.interceptor.TimestampInterceptor$Builder //引用这个类方法添加时间戳

agent5.sinks.hdfssink.type = hdfs
agent5.sinks.hdfssink.hdfs.path = /flume-%Y-%m-%d //定义文件夹名格式
agent5.sinks.hdfssink.hdfs.filePrefix = log-
agent5.sinks.hdfssink.hdfs.rollInterval = 0
agent5.sinks.hdfssink.hdfs.rollCount = 3
agent5.sinks.hdfssink.hdfs.fileType = DataStream
agent5.channels.memorychannel.type = memory
agent5.channels.memorychannel.capacity = 1000
agent5.channels.memorychannel.transactionCapacity = 100
agent5.sources.netsource.channels = memorychannel
agent5.sinks.hdfssink.channel = memorychannel

    OK,下面启动agent5。
     
     下面查看一下结果。可以看到文件夹的名字被如期地加上了日期。
     
     
     OK,先到这里,还有两个稍微复杂一点点的agent下次再讨论。本人水平有限,请各位不吝指正!谢谢!
页: [1]
查看完整版本: Data Collection with Apache Flume(二)