设为首页 收藏本站
查看: 1594|回复: 0

[经验分享] Flume(NG)架构设计要点及配置实践

[复制链接]

尚未签到

发表于 2015-11-27 17:21:15 | 显示全部楼层 |阅读模式
  
Flume NG是一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本。经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡。
架构设计要点
Flume的架构主要有一下几个核心概念:

  • Event:一个数据单元,带有一个可选的消息头
  • Flow:Event从源点到达目的点的迁移的抽象
  • Client:操作位于源点处的Event,将其发送到Flume Agent
  • Agent:一个独立的Flume进程,包含组件Source、Channel、Sink
  • Source:用来消费传递到该组件的Event
  • Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event
  • Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)
Flume NG架构,如图所示:
DSC0000.jpg
外部系统产生日志,直接通过Flume的Agent的Source组件将事件(如日志行)发送到中间临时的channel组件,最后传递给Sink组件,HDFS Sink组件可以直接把数据存储到HDFS集群上。
一个最基本Flow的配置,格式如下:

01#list the sources, sinks and channels for the agent02<Agent>.sources= <Source1> <Source2>03<Agent>.sinks= <Sink1> <Sink2>04<Agent>.channels= <Channel1> <Channel2>05 06#set channel for source07<Agent>.sources.<Source1>.channels= <Channel1> <Channel2> ...08<Agent>.sources.<Source2>.channels= <Channel1> <Channel2> ...09 10#set channel for sink11<Agent>.sinks.<Sink1>.channel= <Channel1>12<Agent>.sinks.<Sink2>.channel= <Channel2>尖括号里面的,我们可以根据实际需求或业务来修改名称。下面详细说明:
表示配置一个Agent的名称,一个Agent肯定有一个名称。与是Agent的Source组件的名称,消费传递过来的Event。与是Agent的Channel组件的名称。与是Agent的Sink组件的名称,从Channel中消费(移除)Event。
上面配置内容中,第一组中配置Source、Sink、Channel,它们的&#20540;可以有1个或者多个;第二组中配置Source将把数据存储(Put)到哪一个Channel中,可以存储到1个或多个Channel中,同一个Source将数据存储到多个Channel中,实际上是Replication;第三组中配置Sink从哪一个Channel中取(Task)数据,一个Sink只能从一个Channel中取数据。
下面,根据官网文档,我们展示几种Flow Pipeline,各自适应于什么样的应用场景:


  • 多个Agent顺序连接
DSC0001.jpg
可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的Agent的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。


  • 多个Agent的数据汇聚到同一个Agent
DSC0002.jpg
这种情况应用的场景比较多,比如要收集Web网站的用户行为日志,Web网站为了可用性使用的负载均衡的集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。


  • 多路(Multiplexing)Agent
DSC0003.jpg
这种模式,有两种方式,一种是用来复制(Replication),另一种是用来分流(Multiplexing)。Replication方式,可以将最前端的数据源复制多份,分别传递到多个channel中,每个channel接收到的数据都是相同的,配置&#26684;式,如下所示:

01#List the sources, sinks and channels for the agent02<Agent>.sources= <Source1>03<Agent>.sinks= <Sink1> <Sink2>04<Agent>.channels= <Channel1> <Channel2>05 06#set list of channels for source (separated by space)07<Agent>.sources.<Source1>.channels= <Channel1> <Channel2>08 09#set channel for sinks10<Agent>.sinks.<Sink1>.channel= <Channel1>11<Agent>.sinks.<Sink2>.channel= <Channel2>12 13<Agent>.sources.<Source1>.selector.type= replicating上面指定了selector的type的&#20540;为replication,其他的配置没有指定,使用的Replication方式,Source1会将数据分别存储到Channel1和Channel2,这两个channel里面存储的数据是相同的,然后数据被传递到Sink1和Sink2。
Multiplexing方式,selector可以根据header的&#20540;来确定数据传递到哪一个channel,配置&#26684;式,如下所示:

1#Mapping for multiplexing selector2<Agent>.sources.<Source1>.selector.type= multiplexing3<Agent>.sources.<Source1>.selector.header= <someHeader>4<Agent>.sources.<Source1>.selector.mapping.<Value1>= <Channel1>5<Agent>.sources.<Source1>.selector.mapping.<Value2>= <Channel1> <Channel2>6<Agent>.sources.<Source1>.selector.mapping.<Value3>= <Channel2>7#...8 9<Agent>.sources.<Source1>.selector.default= <Channel2>上面selector的type的&#20540;为multiplexing,同时配置selector的header信息,还配置了多个selector的mapping的&#20540;,即header的&#20540;:如果header的&#20540;为Value1、Value2,数据从Source1路由到Channel1;如果header的&#20540;为Value2、Value3,数据从Source1路由到Channel2。

  • 实现load balance功能
DSC0004.jpg
Load balancing Sink Processor能够实现load balance功能,上图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上,示例配置,如下所示:

1a1.sinkgroups= g12a1.sinkgroups.g1.sinks= k1 k2 k33a1.sinkgroups.g1.processor.type= load_balance4a1.sinkgroups.g1.processor.priority.k1= 55a1.sinkgroups.g1.processor.priority.k2= 106a1.sinkgroups.g1.processor.priority.k3= 107a1.sinkgroups.g1.processor.maxpenalty= 10000

  • 实现failover能
Failover Sink Processor能够实现failover功能,具体流程类&#20284;load balance,但是内部处理机制与load balance完全不同:Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。如果一个Sink能够成功处理Event,则会加入到一个Pool中,否则会被移出Pool并计算失败次数,设置一个惩罚因子,示例配置如下所示:
1a1.sinkgroups= g12a1.sinkgroups.g1.sinks= k1 k2 k33a1.sinkgroups.g1.processor.type= failover4a1.sinkgroups.g1.processor.backoff= true5a1.sinkgroups.g1.processor.selector= random基本功能
我们看一下,Flume NG都支持哪些功能(目前最新版本是1.5.0.1),了解它的功能集合,能够让我们在应用中更好地选择使用哪一种方案。说明Flume NG的功能,实际还是围绕着Agent的三个组件Source、Channel、Sink来看它能够支持哪些技术或协议。我们不再对各个组件支持的协议详细配置进行说明,通过列表的方式分别对三个组件进行概要说明:

  • Flume Source
Source类型说明Avro Source支持Avro协议(实际上是Avro RPC),内置支持Thrift Source支持Thrift协议,内置支持Exec Source基于Unix的command在标准输出上生产数据JMS Source从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过Spooling Directory Source监控指定目录内数据变更Twitter 1% firehose Source通过API持续下载Twitter数据,试验性质Netcat Source监控某个端口,将流经端口的每一个文本行数据作为Event输入Sequence Generator Source序列生成器数据源,生产序列数据Syslog Sources读取syslog数据,产生Event,支持UDP和TCP两种协议HTTP Source基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式Legacy Sources兼容老的Flume OG中Source(0.9.x版本)

  • Flume Channel
Channel类型说明Memory ChannelEvent数据存储在内存中JDBC ChannelEvent数据存储在持久化存储中,当前Flume Channel内置支持DerbyFile ChannelEvent数据存储在磁盘文件中Spillable Memory ChannelEvent数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用)Pseudo Transaction Channel测试用途Custom Channel自定义Channel实现

  • Flume Sink
Sink类型说明HDFS Sink数据写入HDFSLogger Sink数据写入日志文件Avro Sink数据被转换成Avro Event,然后发送到配置的RPC端口上Thrift Sink数据被转换成Thrift Event,然后发送到配置的RPC端口上IRC Sink数据在IRC上进行回放File Roll Sink存储数据到本地文件系统Null Sink丢弃到所有数据HBase Sink数据写入HBase数据库Morphline Solr Sink数据发送到Solr搜索服务器(集群)ElasticSearch Sink数据发送到Elastic Search搜索服务器(集群)Kite Dataset Sink写数据到Kite Dataset,试验性质的Custom Sink自定义Sink实现另外还有Channel Selector、Sink Processor、Event Serializer、Interceptor等组件,可以参考官网提供的用户手册。
应用实践
安装Flume NG非常简单,我们使用最新的1.5.0.1版本,执行如下命令:
1cd/usr/local2wget http://mirror.bit.edu.cn/apache/flume/1.5.0.1/apache-flume-1.5.0.1-bin.tar.gz3tarxvzf apache-flume-1.5.0.1-bin.tar.gz4cdapache-flume-1.5.0.1-bin如果需要使用到Hadoop集群,保证Hadoop相关的环境变量都已经正确配置,并且Hadoop集群可用。下面,通过一些实际的配置实例,来了解Flume的使用。为了简单期间,channel我们使用Memory类型的channel。

  • Avro Source&#43;Memory Channel&#43;Logger Sink
使用apache-flume-1.5.0.1自带的例子,使用Avro Source接收外部数据源,Logger作为sink,即通过Avro RPC调用,将数据缓存在channel中,然后通过Logger打印出调用发送的数据。
配置Agent,修改配置文件conf/flume-conf.properties,内容如下:

01#Define a memory channel called ch1 on agent102agent1.channels.ch1.type= memory03 04#Define an Avro source called avro-source1 on agent1 and tell it05#to bind to 0.0.0.0:41414. Connect it to channel ch1.06agent1.sources.avro-source1.channels= ch107agent1.sources.avro-source1.type= avro08agent1.sources.avro-source1.bind= 0.0.0.009agent1.sources.avro-source1.port= 4141410 11#Define a logger sink that simply logs all events it receives12#and connect it to the other end of the same channel.13agent1.sinks.log-sink1.channel= ch114agent1.sinks.log-sink1.type= logger15 16#Finally, now that we've defined all of our components, tell17#agent1 which ones we want to activate.18agent1.channels= ch119agent1.channels.ch1.capacity= 100020agent1.sources= avro-source121agent1.sinks= log-sink1首先,启动Agent进程:
1bin/flume-ngagent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1然后,启动Avro Client,发送数据:
1bin/flume-ngavro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log-Dflume.root.logger=DEBUG,console

  • Avro Source&#43;Memory Channel&#43;HDFS Sink
配置Agent,修改配置文件conf/flume-conf-hdfs.properties,内容如下:
01#Define a source, channel, sink02agent1.sources= avro-source103agent1.channels= ch104agent1.sinks= hdfs-sink05 06#Configure channel07agent1.channels.ch1.type= memory08agent1.channels.ch1.capacity= 100000009agent1.channels.ch1.transactionCapacity= 50000010 11#Define an Avro source called avro-source1 on agent1 and tell it12#to bind to 0.0.0.0:41414. Connect it to channel ch1.13agent1.sources.avro-source1.channels= ch114agent1.sources.avro-source1.type= avro15agent1.sources.avro-source1.bind= 0.0.0.016agent1.sources.avro-source1.port= 4141417 18#Define a logger sink that simply logs all events it receives19#and connect it to the other end of the same channel.20agent1.sinks.hdfs-sink1.channel= ch121agent1.sinks.hdfs-sink1.type= hdfs22agent1.sinks.hdfs-sink1.hdfs.path= hdfs://h1:8020/data/flume/23agent1.sinks.hdfs-sink1.hdfs.filePrefix= sync_file24agent1.sinks.hdfs-sink1.hdfs.fileSuffix= .log25agent1.sinks.hdfs-sink1.hdfs.rollSize= 104857626agent1.sinks.hdfs-sink1.rollInterval= 027agent1.sinks.hdfs-sink1.hdfs.rollCount= 028agent1.sinks.hdfs-sink1.hdfs.batchSize= 150029agent1.sinks.hdfs-sink1.hdfs.round= true30agent1.sinks.hdfs-sink1.hdfs.roundUnit= minute31agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize= 2532agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp= true33agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas= 134agent1.sinks.hdfs-sink1.fileType= SequenceFile35agent1.sinks.hdfs-sink1.writeFormat= TEXT首先,启动Agent:
1bin/flume-ngagent -c ./conf/ -f conf/flume-conf-hdfs.properties -Dflume.root.logger=INFO,console -n agent1然后,启动Avro Client,发送数据:
1bin/flume-ngavro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log-Dflume.root.logger=DEBUG,console可以查看同步到HDFS上的数据:
1hdfsdfs -ls /data/flume结果示例,如下所示:
1-rw-r--r--  3 shirdrn supergroup    1377617 2014-09-16 14:35 /data/flume/sync_file.1410849320761.log2-rw-r--r--  3 shirdrn supergroup    1378137 2014-09-16 14:35 /data/flume/sync_file.1410849320762.log3-rw-r--r--  3 shirdrn supergroup     259148 2014-09-16 14:35 /data/flume/sync_file.1410849320763.log

  • Spooling Directory Source&#43;Memory Channel&#43;HDFS Sink
配置Agent,修改配置文件flume-conf-spool.properties,内容如下:
01#Define source, channel, sink02agent1.sources= spool-source103agent1.channels= ch104agent1.sinks= hdfs-sink105 06#Configure channel07agent1.channels.ch1.type= memory08agent1.channels.ch1.capacity= 100000009agent1.channels.ch1.transactionCapacity= 50000010 11#Define and configure an Spool directory source12agent1.sources.spool-source1.channels= ch113agent1.sources.spool-source1.type= spooldir14agent1.sources.spool-source1.spoolDir= /home/shirdrn/data/15agent1.sources.spool-source1.ignorePattern= event(_\d{4}\-\d{2}\-\d{2}_\d{2}_\d{2})?\.log(\.COMPLETED)?16agent1.sources.spool-source1.batchSize= 5017agent1.sources.spool-source1.inputCharset= UTF-818 19#Define and configure a hdfs sink20agent1.sinks.hdfs-sink1.channel= ch121agent1.sinks.hdfs-sink1.type= hdfs22agent1.sinks.hdfs-sink1.hdfs.path= hdfs://h1:8020/data/flume/23agent1.sinks.hdfs-sink1.hdfs.filePrefix= event_%y-%m-%d_%H_%M_%S24agent1.sinks.hdfs-sink1.hdfs.fileSuffix= .log25agent1.sinks.hdfs-sink1.hdfs.rollSize= 104857626agent1.sinks.hdfs-sink1.hdfs.rollCount= 027agent1.sinks.hdfs-sink1.hdfs.batchSize= 150028agent1.sinks.hdfs-sink1.hdfs.round= true29agent1.sinks.hdfs-sink1.hdfs.roundUnit= minute30agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize= 2531agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp= true32agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas= 133agent1.sinks.hdfs-sink1.fileType= SequenceFile34agent1.sinks.hdfs-sink1.writeFormat= TEXT35agent1.sinks.hdfs-sink1.rollInterval= 0启动Agent进程,执行如下命令:
1bin/flume-ngagent -c ./conf/ -f conf/flume-conf-spool.properties -Dflume.root.logger=INFO,console -n agent1可以查看HDFS上同步过来的数据:
1hdfsdfs -ls /data/flume结果示例,如下所示:
01-rw-r--r--  3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355094.log02-rw-r--r--  3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355095.log03-rw-r--r--  3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355096.log04-rw-r--r--  3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355097.log05-rw-r--r--  3 shirdrn supergroup       1530 2014-09-17 10:53 /data/flume/event_14-09-17_10_52_00.1410922355098.log06-rw-r--r--  3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380386.log07-rw-r--r--  3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380387.log08-rw-r--r--  3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380388.log09-rw-r--r--  3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380389.log10-rw-r--r--  3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380390.log

  • Exec Source&#43;Memory Channel&#43;File Roll Sink
配置Agent,修改配置文件flume-conf-file.properties,内容如下:
01#Define source, channel, sink02agent1.sources= tail-source103agent1.channels= ch104agent1.sinks= file-sink105 06#Configure channel07agent1.channels.ch1.type= memory08agent1.channels.ch1.capacity= 100000009agent1.channels.ch1.transactionCapacity= 50000010 11#Define and configure an Exec source12agent1.sources.tail-source1.channels= ch113agent1.sources.tail-source1.type= exec14agent1.sources.tail-source1.command= tail -F /home/shirdrn/data/event.log15agent1.sources.tail-source1.shell= /bin/sh -c16agent1.sources.tail-source1.batchSize= 5017 18#Define and configure a File roll sink19#and connect it to the other end of the same channel.20agent1.sinks.file-sink1.channel= ch121agent1.sinks.file-sink1.type= file_roll22agent1.sinks.file-sink1.batchSize= 10023agent1.sinks.file-sink1.serializer= TEXT24agent1.sinks.file-sink1.sink.directory= /home/shirdrn/sink_data启动Agent进程,执行如下命令:
1bin/flume-ngagent -c ./conf/ -f conf/flume-conf-file.properties-Dflume.root.logger=INFO,console -n agent1可以查看File Roll Sink对应的本地文件系统目录/home/shirdrn/sink_data下,示例如下所示:
1-rw-rw-r--1 shirdrn shirdrn 13944825 Sep 17 11:36 1410924990039-12-rw-rw-r--1 shirdrn shirdrn 11288870 Sep 17 11:37 1410924990039-23-rw-rw-r--1 shirdrn shirdrn        0 Sep 17 11:37 1410924990039-34-rw-rw-r--1 shirdrn shirdrn 20517500 Sep 17 11:38 1410924990039-45-rw-rw-r--1 shirdrn shirdrn 16343250 Sep 17 11:38 1410924990039-5有关Flume NG更多配置及其说明,请参考官方用户手册,非常详细。
以上文章转自:http://shiyanjun.cn/archives/915.html  

  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144274-1-1.html 上篇帖子: Flume Channel Selectors使用 下篇帖子: Flume NG configuration sample
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表