设为首页 收藏本站
查看: 1300|回复: 0

[经验分享] flume 1.4的介绍及使用示例

[复制链接]
发表于 2015-11-27 20:36:43 | 显示全部楼层 |阅读模式
flume 1.4的介绍及使用示例
  

  
本文将介绍关于flume 1.4的使用示例,如果还没有安装flume的话可以参考:http://blog.csdn.net/zhu_xun/article/details/16958385
在进行使用示例说明之前,先请大家先明确flume中的Source和Sink以及Channel的概念。
flume-ng是flume的新版本的意思,其中“ng”意为new generate(新一代),目前来说,flume 1.4(也就是flume-ng)是最新的版本。

一、flume介绍
flume是个日志收集系统,这个日志收集系统由一个或多个agent(代理)构成,每个agent由三部分构成:Source、Channel、Sink,如下图:
DSC0000.jpg

agent结构图


source为水源,是aent获取数据的入口;channel为管道,是数据(由resource获得)流动的通道,主要作用是用来传输和存储数据;sink为水槽,用来接收channel传入的数据并将数据输出到指定地方。大家可以把agent看作一个水管,source就是水管的入口,sink就是水管的出口,把数据当作水来看,数据流也就意味着水流。数据由source获得流经channel,最后传给sink。下图演示了一个完整的agent流程,由webserver获取数据,数据经channel流向sink,最后由sink将数据存储在hdfs里面。
DSC0001.jpg
上面说到了一个flume系统可以由一个或多个agent组成,多个agent只要做一些简单的配置就可以串在一起,比如将两个agent(foo、bar)串在一起工作,只要将bar的source(入口)接在foo的sink(出口)上就可以了。如下图: DSC0002.jpg
再看看下图,下图是将4个agent串在一起,agent1、agent2和agent3都是获取web服务器的数据,然后将各自获得到的数据统一地发送给agent4,最后由agent4将收集到的数据存储在hdfs里面。怎么样,agent的使用是不是很灵活,扩展性也很高,就行拼图一样,想怎么拼就怎么拼。 DSC0003.jpg
其实在1个agent里面,对source、channel、sink的个数是无限制的,可以有多个,只要他们能够正确匹配就行。请看下图,这个例子中source获取到的数据被分发给了3个channel,其中sink1将数据输出到hdfs里面,sink2将数据作为jms输出。 DSC0004.jpg


agent本质上是一个jvm进程,agent各组件间的工作是通过event事件来触发和协调的。使用agent时候,我们需要在agent的配置文件中设置好配置信息,source、channel、sink都有各自不同的配置选项。
flume常用组件的详细介绍如下表所示:

  • Flume Source
Source类型说明Avro Source支持Avro协议(实际上是Avro RPC),内置支持Thrift Source支持Thrift协议,内置支持Exec Source基于Unix的command在标准输出上生产数据JMS Source从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过Spooling Directory Source监控指定目录内数据变更Twitter 1% firehose Source通过API持续下载Twitter数据,试验性质Netcat Source监控某个端口,将流经端口的每一个文本行数据作为Event输入Sequence Generator Source序列生成器数据源,生产序列数据Syslog Sources读取syslog数据,产生Event,支持UDP和TCP两种协议HTTP Source基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式Legacy Sources兼容老的Flume OG中Source(0.9.x版本)

  • Flume Channel
Channel类型说明Memory ChannelEvent数据存储在内存中JDBC ChannelEvent数据存储在持久化存储中,当前Flume Channel内置支持DerbyFile ChannelEvent数据存储在磁盘文件中Spillable Memory ChannelEvent数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用)Pseudo Transaction Channel测试用途Custom Channel自定义Channel实现

  • Flume Sink
Sink类型说明HDFS Sink数据写入HDFSLogger Sink数据写入日志文件Avro Sink数据被转换成Avro Event,然后发送到配置的RPC端口上Thrift Sink数据被转换成Thrift Event,然后发送到配置的RPC端口上IRC Sink数据在IRC上进行回放File Roll Sink存储数据到本地文件系统Null Sink丢弃到所有数据HBase Sink数据写入HBase数据库Morphline Solr Sink数据发送到Solr搜索服务器(集群)ElasticSearch Sink数据发送到Elastic Search搜索服务器(集群)Kite Dataset Sink写数据到Kite Dataset,试验性质的Custom Sink自定义Sink实现另外还有Channel Selector、Sink Processor、Event Serializer、Interceptor等组件,可以参考官网提供的用户手册。

二、flume使用示例1.通过avro-client客户端向agent发送数据,并打印在屏幕上(也就是输出至日志中):
注意:
a.本文中,以$FLUME代表flume的安装目录。
b.本文中的每个使用示例的配置文件名称皆为:source_sourceType-sink_sinkType.properties。
其中,sourceType表示source的类型,sinkType表示sink的类型。
比如,配置文件source_avro-sink_filerole.properties表示此例中的source类型为avro,sink的类型为filerole,即接收avro客户端传来的数据并将数据输出值本地文件。
c.本文中的channel的类型默认为memory,意为将数据存储至内存。


(1).在$FLUME目录下新建目录test,并在test目录下新建文件source_avro-sink_logger.properties,文件内容如下:

[java] viewplaincopy DSC0005.jpg DSC0006.jpg

  • <span style=&quot;font-family:Tahoma;font-size:14px;&quot;>#为agent的各个组件命名  
  • #本例中,agent的名称为&quot;a1&quot;   
  • a1.sources = r1  
  • a1.sinks = k1  
  • a1.channels = c1  
  •   
  • #source配置信息  
  • #r1的type为avro表示该source接收的数据协议为avro,且接收数据由avro客户端事件驱动  
  • #(也就是说resource要通过avro-cliet向其发送数据)  
  • a1.sources.r1.type = avro  
  • a1.sources.r1.bind = localhost  
  • a1.sources.r1.port = 44444  
  •   
  • #sink配置信息  
  • # type为logger意将数据输出至日志中(也就是打印在屏幕上)  
  • a1.sinks.k1.type = logger  
  •   
  • #channel配置信息  
  • #type为memory意将数据存储至内存中  
  • a1.channels.c1.type = memory  
  • a1.channels.c1.capacity = 1000  
  • a1.channels.c1.transactionCapacity = 100  
  •   
  • #将source和sink绑定至该channel上  
  • a1.sources.r1.channels = c1  
  • a1.sinks.k1.channel = c1</span>  


(2).生成测试源数据:
新建文件file01.txt,并向其中写入如下数据;
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9
hello world 10

(3)启动agent代理:

flume-ng agent -n a1 -f source_avro-sink_logger.properties

(4). 启动avro-client客户端向agent代理发送数据:
flume-ng avro-client -H localhost -p 44444 -F file01

注:启动avro-client客户端要重新开一个会话框
(5).这是可以看到aent的输出信息:
DSC0007.jpg

2.通过avro-client客户端向agent发送数据,并将数据输出至本地文件中:

(1).在$FLUME目录下新建目录test,并在test目录下新建文件source_avro-sink_filerole.properties,文件内容如下:

[java] viewplaincopy

  • <span style=&quot;font-family:Tahoma;font-size:14px;&quot;>#为agent的各个组件命名  
  • #本例中,agent的名称为&quot;a1&quot;  
  • a1.sources = r1  
  • a1.sinks = k1  
  • a1.channels = c1  
  •   
  • #source配置信息  
  • #r1的type为avro表示该source接收的数据协议为avro,且接收数据由avro客户端事件驱动  
  • #(也就是说resource要通过avro-cliet向其发送数据)  
  • a1.sources.r1.type = avroa1.sources.r1.bind = localhosta1.sources.r1.port = 44444  
  • #sink配置信息  
  • #&quot;file_roll&quot;表示将数据存入本地文件系统  
  • a1.sinks.k1.type = file_roll  
  • #指定数据存放目录  
  • a1.sinks.k1.sink.directory = $FLUME/test/result  
  • #设置滚动时间(即每隔一段你设置的时间,系统会生成一个新的文件存放数据  
  • #(如果不指定,系统会默认生成N个文件,将数据分别存入N个文件中),  
  • #为0时表示只有一个文件存放数据)  
  • a1.sinks.k1.sink.rollInterval = 0   
  •   
  • #channel配置信息  
  • #type为memory意将数据存储至内存中  
  • a1.channels.c1.type = memory  
  • a1.channels.c1.capacity = 1000  
  • a1.channels.c1.transactionCapacity = 100  
  •   
  • #将source和sink绑定至该channel上  
  • a1.sources.r1.channels= c1  
  • a1.sinks.k1.channel = c1</span>  
同时在$FLUME/test下新建目录result,用来存放agent写入的数据。
(2).生成测试源数据:
同上
(3)启动agent代理:
flume-ng agent -n a1 -f source_avro-sink_filerole.properties
(4). 启动avro-client客户端向agent代理发送数据:
flume-ng avro-client -H localhost -p 44444 -F file01
注:启动avro-client客户端要重新开一个会话框
(5)进入$FLUME/test/result目录:
  这是可以看到新生成的文件&quot;1386378213670-1&quot;,这是打开文件看到内容如下:


3.通过avro-client客户端向agent发送数据,并将数据输出至hdfs中:
(1).在$FLUME目录下新建目录test,并在test目录下新建文件source_avro-sink_hdfs.properties,文件内容如下:


[java] viewplaincopy

  • <span style=&quot;font-family:Tahoma;font-size:14px;&quot;>#为agent的各个组件命名  
  • #本例中,agent的名称为&quot;a1&quot;  
  • a1.sources = r1  
  • a1.sinks = k1  
  • a1.channels = c1  
  •   
  • #source配置信息  
  • #r1的type为avro表示该source接收的数据协议为avro,且接收数据由avro客户端事件驱动  
  • #(也就是说resource要通过avro-cliet向其发送数据)  
  • a1.sources.r1.type = avro  
  • a1.sources.r1.bind = localhost  
  • a1.sources.r1.port = 44444  
  •   
  • #加入时间戳拦截器,要不运行时会报异常  
  • a1.sources.r1.interceptors = i1  
  • a1.sources.r1.interceptors.i1.type = timestamp  
  •   
  • #sink配置信息  
  • #type为&quot;hdfs&quot;表示将数据存入分布式文件系统(hdfs)  
  • a1.sinks.k1.type = hdfs  
  • a1.sinks.k1.channel = c1  
  • a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S  
  • a1.sinks.k1.hdfs.filePrefix = events-  
  • #a1.sinks.k1.hdfs.round = true  
  • #a1.sinks.k1.hdfs.roundValue = 0  
  • #a1.sinks.k1.hdfs.roundUnit = minute  
  • #a1.sinks.k1.hdfs.srollSize = 4000000  
  • #a1.sinks.k1.hdfs.rollCount = 0   
  • a1.sinks.k1.hdfs.writeFormat = Text  
  • a1.sinks.k1.hdfs.fileType = DataStream  
  • #a1.sinks.k1.hdfs.batchSize = 10  
  •   
  • #channel配置信息  
  • #type为memory意将数据存储至内存中  
  • a1.channels.c1.type = memory  
  • a1.channels.c1.capacity = 1000  
  • a1.channels.c1.transactionCapacity = 100  
  •   
  • #将source和sink绑定至该channel上  
  • a1.sources.r1.channels = c1  
  • a1.sinks.k1.channel = c1</span>  
(2).生成测试源数据:
同上
(3).启动hadoop环境
启动hadoop:start-all.sh
删除output目录:hadoop fs -rmr output
为了防止hadoop的安全性问题,可以把hadoop的安全模式关闭掉:hadoop dfsadmin -safemode leave
(4)启动agent代理:
flume-ng agent -n a1 -f source_avro-sink_hdfs.properties
(5). 启动avro-client客户端向agent代理发送数据:
flume-ng avro-client -H localhost -p 44444 -F file01
注:启动avro-client客户端要重新开一个会话框
(6)进入hadoop的output目录:
hadoop fs -cat /flume/events/13-12-07/1220/58/events-.1386390061690
可以看到输出如下:


注:在进入hdfs目录下查看文件时,agent在hdfs里创建的目录和文件名和我上面输入的会不一样,自己注意辨别。
4.将两个agent串起来:
这个示例是将agent01和agent02串起来工作的,agent01获取数据后向agent02输入。
(1).在$FLUME目录下新建目录test,并在test目录下新建文件agent01.properties和agent02.properties,文件内容如下:
agent01.properties内容如下:


[java] viewplaincopy

  • <span style=&quot;font-family:Tahoma;font-size:14px;&quot;>#为agent的各个组件命名  
  • #本例中,agent的名称为&quot;a1&quot;  
  • a1.sources = r1  
  • a1.sinks = k1  
  • a1.channels = c1  
  •   
  • #source配置信息  
  • #r1的type为avro表示该source接收的数据协议为avro,且接收数据由avro客户端事件驱动  
  • #(也就是说resource要通过avro-cliet向其发送数据)  
  • a1.sources.r1.type = avro  
  • a1.sources.r1.bind = localhost  
  • a1.sources.r1.port = 44444  
  •   
  • # Describe the sink  
  • #a1.sinks.k1.type = logger  
  •   
  • #sink配置信息  
  • #k1的type为avro表示该sink将通过avro-client客户端以avro协议通过5555端口发送数据  
  • a1.sinks.k1.type = avro  
  • a1.sinks.k1.hostname = localhost  
  • a1.sinks.k1.port = 55555  
  •   
  • #channel配置信息  
  • #type为memory意将数据存储至内存中  
  • a1.channels.c1.type = memory  
  • a1.channels.c1.capacity = 1000  
  • a1.channels.c1.transactionCapacity = 100  
  •   
  • #将source和sink绑定至该channel上  
  • a1.sources.r1.channels = c1  
  • a1.sinks.k1.channel = c1</span>  
agent02.properties内容如下:
[java] viewplaincopy

  • <span style=&quot;font-family:Tahoma;font-size:14px;&quot;>#为agent的各个组件命名  
  • #本例中,agent的名称为&quot;a2&quot;  
  • a2.sources = r1  
  • a2.sinks = k1  
  • a2.channels = c1  
  •   
  •   
  • #source配置信息  
  • #r1的type为avro表示该source接收的数据协议为avro,且接收数据由avro客户端事件驱动  
  • #(也就是说resource要通过avro-cliet向其发送数据)  
  • a1.sources.r1.type = avro  
  • a1.sources.r1.bind = localhost  
  • a1.sources.r1.port = 55555   
  •   
  •   
  • #sink配置信息  
  • #type为&quot;file_roll&quot;表示将数据存入本地文件系统  
  • a1.sinks.k1.type = file_roll  
  • #指定数据存放目录  
  • a1.sinks.k1.sink.directory = $FLUME/test/log  
  • #设置滚动时间(即每隔一段你设置的时间,系统会生成一个新的文件存放数据  
  • #(如果不指定,系统会默认生成N个文件,将数据分别存入N个文件中),  
  • #为0时表示只有一个文件存放数据)  
  • #a2.sinks.k1.sink.rollInterval = 0   
  •   
  •   
  • #channel配置信息  
  • #type为memory意将数据存储至内存中  
  • a1.channels.c1.type = memory  
  • a1.channels.c1.capacity = 1000  
  • a1.channels.c1.transactionCapacity = 100  
  •   
  •   
  • #将source和sink绑定至该channel上  
  • a1.sources.r1.channels = c1  
  • a1.sinks.k1.channel = c1</span>  
(2).生成测试源数据:
同上
(3).启动agent代理:
  先启动agent02:flume-ng agent -n a2 -f agent02.properties
  再启动agent01:flume-ng agent -n a1 -f agent01.properties
注意:要注意启动顺序,先启动agent02,在启动agent01,要分别在不同的窗口启动
(4). 启动avro-client客户端向agent01代理发送数据:
flume-ng avro-client -H localhost -p 44444 -F file01
注:启动avro-client客户端要重新开一个会话框
(5)进入$FLUME/test/result目录:
  这是可以看到新生成的文件&quot;1386378213670-1&quot;,这是打开文件看到内容如下:


未完待续。。。。。。

关于flume-ng的详细介绍和使用说明,希望大家去参考它的官方网址:http://flume.apache.org/FlumeUserGuide.html,在这里我只是做一下简单的使用示例介绍。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144374-1-1.html 上篇帖子: flume 自定义 hbase sink 类 下篇帖子: flume搭建调试
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表