设为首页 收藏本站
查看: 2065|回复: 0

[经验分享] flume1.4.0源码结构剖析

[复制链接]

尚未签到

发表于 2015-11-27 19:54:31 | 显示全部楼层 |阅读模式
flume基本思想:

DSC0000.jpg


       source负责收集数据,channel负责缓存数据,sink负责消费channel中的数据,具体使用方式这里不赘述

生命周期管理:

生命周期相关代码在flume-ng-core文件夹下的lifecycle子文件夹内
flume的所有组件(除了monitor service)都有生命周期的概念,主要作用是用来标记组件目前所属的状态。flume组件的生命周期有四个状态,分别是IDLE,START,STOP,ERROR,意义如下:

IDLE
组件已经构造完成
START
组件已经启动
STOP
组件已经停止
ERROR
组件发生了错误




       其中对象本身需要实现LifecycleAware接口【见代码】,flume对组件初始化完成后会调用LifecycleSupervisor.supervise()将该组件加入监控,LifecycleSupervisor类内部对每一个组件会启动一个定时线程MonitorRunnable,在定时线程中调用组件的start/stop函数对组件进行控制
       这里注意对组件stop函数的调用是在unsupervise里面进行的,而不是通过定时线程MonitorRunnable控制的


abstract public class AbstractSink implements Sink, LifecycleAware {
private LifecycleState lifecycleState;
public AbstractSink() {
lifecycleState = LifecycleState.IDLE;
}
@Override
public synchronized void start() {
......
lifecycleState = LifecycleState.START;
......
}
@Override
public synchronized void stop() {
......
lifecycleState = LifecycleState.STOP;
......
}
@Override
public synchronized LifecycleState getLifecycleState() {
return lifecycleState;
}
......
}
  

flume 启动流程:

        flume的main函数在flume-ng-node文件夹下Application类中
        实际上启动流程的主要目的有两个:
        1.通过配置文件构造组件,连接有关联的组件【hook】,将组件填入到MaterializedConfiguration类中
        2.从MaterializedConfiguration类中获取组件,将组件注册到LifecycleSupervisor中,通过LifecycleSupervisor启动



        可以看出MaterializedConfiguration的重要性,下面是其定义,可见其意义就是存储所有初始化好的组件并提供给程序调用:


public interface MaterializedConfiguration {
public void addSourceRunner(String name, SourceRunner sourceRunner);
public void addSinkRunner(String name, SinkRunner sinkRunner);
public void addChannel(String name, Channel channel);
public ImmutableMap<String, SourceRunner> getSourceRunners();
public ImmutableMap<String, SinkRunner> getSinkRunners();
public ImmutableMap<String, Channel> getChannels();
}
  

      main函数依次做了以下几件事
      1.解析命令行
      2.寻找命令行里的配置文件路径,解析配置文件,并放在一个File对象里
      3.调用AbstractConfigurationProvider实例,传入File配置文件构造对象,返回一个MaterializedConfiguration对象,该对象里存放所有的sink,sinkgroup,source,channel组件
      4.调用application.start(),该函数将所有组件加入生命周期管理并注册启动monitor服务
      5.向JVM注册一个函数,当进程结束时停止所有组件
      这里要注意第三步其实根据命令行中的no-reload-conf参数【是否关闭重载配置文件】选择调用AbstractConfigurationProvider的派生类PollingPropertiesFileConfigurationProvider或者PropertiesFileConfigurationProvider。
       其他几步都很直接,比较复杂的流程在调用AbstractConfigurationProvider,解析配置文件并初始化组件这一步。
       调用AbstractConfigurationProvider的目的是初始化所有组件,挂好相互关联的钩子并将组件封装到MaterializedConfiguration对象中返回给上层。
       AbstractConfigurationProvider类的功能比较好理解,关键在于flume支持配置重载,配置重载则channel组件涉及到是否复用的问题【重新生成channel可能会导致之前数据的丢失问题】。AbstractConfigurationProvider类的getConfiguration()函数主要处理了这一套逻辑
       getConfiguration()函数的执行基本流程如下:
      1.加载配置文件
      2.load channels
      3.load sources,这一块要传入2中load的channel,因为要挂channel和source之间的钩子【这一块实际上是将channel封装到ChannelProcessor中,将ChannelProcessor传入Source,关于ChannelProcessor的作用见下文Channel结构介绍】
      4.load sinks,这一块要传入2中load的channel,因为要挂channel和sink之间的钩子
      5.移除没有sink和source连接的channel【如果只有source/sink这里不会检测】
      6.将sink,source,channel封装在MaterializedConfiguration对象中并返回,注意source是封装在SourceRunner类中,sink是封装在SinkRunner中。这里的定义是Runner负责与操作系统打交道,而source/sink专注于维护自身逻辑。SinkRunner中还要注意一点是一个Sink只能在一个SinkGroup里面。



注意:
      1.加载配置文件时调用了FlumeConfiguration类,该类内部有对配置文件进行合法性校验,校验的错误列表可以通过getConfigurationErrors()接口获取
      2.每个组件都有对应的Factor类(如ChannelFactor),通过调用Factor类的create()函数创建对象
      3.load_channel函数中channel可以通过加上@Disposable声明表示不复用,重新加载配置时生成一个新的channel
      4.细读load_channels,load_sources,load_sinks代码,可以发现加载组件时分了两种情况,has ComponentConfiguration object和do not hava a ComponentConfiguration object。这一块可以不用理会,细读配置文件逻辑好像全部都是后一种do not hava a ComponentConfiguration object



      以上步骤执行完,flume就可以提供服务了。


flume各组件的内部结构:

组件的结构代码在flume-ng-core文件夹中

channel:

       channel模块的作用是给数据提供中转的储藏地,由flume的架构图可以看出channel负责给source模块提供写数据接口,给sink模块提供读数据接口。实际实现中每一个source模块中都有一个ChannelProcessor,通过ChannelProcessor向Channel写入数据,而sink模块是直接通过Channel模块提供的接口读取。
      为什么Source写入要通过封装的Processor进行?这里有两个原因:1.source模块写入的数据可能在写入Channel前需要进行过滤处理;2.source模块可能要将不同的数据写到不同的Channel上实现数据分发的效果。ChannelProcessor主要就是为了解决这两个问题存在的。以下是ChannelProcessor的结构图:

DSC0001.jpg


       上图中:InterceptorChain模块主要解决问题1,source写入的数据可以经过InterceptorChain模块中一系列的过滤器依次处理,最终过滤出来的数据进入ChannelSelector模块,ChannelSelector解决了上面提到的问题2,提供了将不同数据分发到不同Channel的机制。
        Channel中基本的数据单元是Event,每个Event包含有一个map<String, String>类型的header,每一个Interceptor可以在header里面添加key,value形式的tag让后续流程处理,或者按照某些规则将过滤event。
        ChannelSelector目前有两种类型。一种是ReplicatingChannelSelector,消息发往所有的channel,另一种是MultiplexingChannelSelector,可以指定发往某些Channel
        其中Channel也有两种类型:一种是required,一种是optional.这两者的区别是required类型如果写入失败会throw Exception。optional类型的写入失败不处理。
       Channel中处理数据的最小单元是Transaction,一个Transaction由多个Event组成。Channel内部实现要保证一个Transaction要么全部成功,要么全部不成功。具体实现方法下面细说。



       下面是这几个类之间的关系图。图中Channel只画了MemoryChannel,Interceptor的具体实现类没有画。
DSC0002.jpg



       其中要注意的是:
       1.ChannelProcessor的初始化是在初始化Source模块的时候。
       2.每一个线程有自己的Transaction,这个具体实现是在BasicChannelSemantics类中利用TreadLocal机制。这么做的好处是维护transaction相对方便。
       3. Transaction的维护方法:MemoryChannel的机制是每一个Transaction有自己的存储空间,分为putlist和takelist,MemoryChannel有一个公共的queue。
       每次take时从queue中取出一个event并放置在本地的takelist内并返回该event,commit的时候清空takelist,rollback时将takelist中的event重新放回queue。
       每次put时将event放置在putlist中,commit时将putlist中内容写入queue中,rollback时将putlist清空。
      4.BasicChannelSemantics类主要负责的是维护(thread,transaction)这样的关系。而由于Transaction是接口,不能直接给BasicChannelSemantics进行操作。所以这里又封装了一个BasicTransactionSemantics类。这两个类实现相对简单,这里不细述。




上面介绍初始化的时候曾经说过source和sink封装在SourceRunner和SinkRunner中。Source和Sink这两部分主要各种对接外部系统的模块比较多,实际结构还是比较简单的

source:

       source的主要功能是收集外部数据,并通过ChannelProcessor写入Channel。Source分为两种类型,PollableSource和EventDrivenSource,分别对应不同的SourceRunner。其中PollableSource是指那些需要定时轮询的Source,比如定时扫描文件,读取新增内容。其他Source都是EventDrivenSource。
DSC0003.jpg



       1.注意BasicSourceSemantics类型维护了Source的生命周期。
       2.PollableSource类型的source具体的功能函数封装在process()中,PollableSourceRunner负责定期调source.process()将数据写入Channel.
       3.EventDrivenSource类型全部逻辑基本都在Source实现里面,通过start接口启动【start接口是LifecycleSupervisor负责调用的,上文有描述】。每一种source根据不同的外部系统有不同的处理逻辑,这里不赘述。

sink&sinkgroup:

        sink/sinkgroup的主要作用是消费Channel中的数据,其基本思想是定期轮询Sink,从Channel中拉取一定量数据做处理。
        一个Sink只能对应一个下游,但是一个下游不可靠。所以flume里面有一个机制将多个sink封装为一个SinkGroup,每次消费时通过SinkSelector选择不同Sink实现容错/负载均衡方案
        如果SinkSelector需要对Sink做筛选,那么它需要知道每一个Sink的状态。每一个sink都要自己维护一个status变量,有两个&#20540;【READY, BACKOFF】,定义在interface Sink中,READY的意思是该sink可以从Channel中取数据,BACKOFF的意义是该sink目前不能从Channel中取数据。这个status就是给SinkSelector做决策使用的。
Sink的结构图如下:
DSC0004.jpg


       SinkRunner的逻辑跟PollableSource一致。每隔一段时间调用SinkProcessor的process()函数
       SinkProcessor里面封装了Sink和SinkSelector。如果是单独sink,直接调用这个sink的process(),如果是一组sink,通过SinkSelector选择出一个sink,再调用这个sink的process().
      目前flume里面实现的SinkProcessor有三种,DefaultSinkProcessor/FailoverSinkProcessor/LoadBalancingSinkProcessor
       1.DefaultSinkProcessor:提供给单独Sink使用。直接调用sink的process()函数



       2.FailoverSinkProcessor:提供给SinkGroup使用。这个Processor给每个sink都设置了一个priority【通过配置文件】,其思想是每次都会【尽量】调用priority最大的sink。

            这个Processor将sinks分为两组,一组是Queue类型的failedSinks队列,保存上一次sink.process()调用出错的sink,并且每一个调用出错的sink都有一个冷却时间,冷却时间之内的sink不会调用,另一组是SortedMap<Integer, Sink> liveSinks,保存正常的sink,其中Integer指该sink的priority
        每次选择sink的流程如下:a.从failedSinks里面取出冷却最久的sink,调用process,如果成功则将该sink重新加入liveSinks,如果失败则刷新这个sink的冷却时间并将其重新push回failedSinks。然后试图在failedSinks寻找下一个sink
        b.如果failedSinks为空或者failedSinks内的sink全部不可用,则调用liveSinks里面权重最大的sink,如果该sink调用失败则塞入failedSinks



        3.LoadBalancingSinkProcessor:基本思想是将Channel中的数据平均分配到该Processor的所有sink里。
        这个Processor下面有两个Selector,分别是RandomOrderSinkSelector和RoundRobinSinkSelector。前者提供的功能是返回一个随机排序的sink数组,后者则是返回一个轮询调度形式的数组,比如这一次返回【1,2,3】,下一次就是【2,3,1】,下下一次就是【3,1,2】。
        LoadBalancingSinkProcessor调用用户配置的Seletor返回一个数组,然后依次调用这个数组中的sink.process(),直到某一次调用成功,则返回,如果全部失败会返回错误。
        这个Processor有一个比较重要的配置【backoff】。这个配置决定了某个sink调用失败后的行为。如果backoff为true,则该sink失败后就会冷却一段时间。这段时间调用Selector返回的数组不会包含该sink。
        Sink主要是实现从Channel众消费数据的逻辑,这里不一一描述。要注意的是SinkGroup中一个Sink如果卡住这个SinkGroup会卡住。

监控模块:

        在启动流程章节里提到,在启动所有组件后会启动minotor监控进程。监控主要分为两部分,收集数据和上报数据。
       收集数据的逻辑是夹杂在具体Channel,Sink,Source等的实现逻辑里面的。其中sink,source,channel等结构监控数据的数据结构在flume-ng-core文件夹下面的instrumententation里面。分别是SinkCounter,SourceCounter和ChannelCounter等。如果要自己实现这些组件,需要调用这几个类的接口上报监控信息。flume-ng-core下面还有一个CounterGroup,这个统计了一些Runner的事件。这个接口应该是老接口。
       每一个Count都会在MBean里面注册。上报的数据存储在这个MBean对象里。
       flume内置三种上报监控的方式,分别是zabbix,ganglia和http。其中http方式是在flume侧起一个server,由外部主动来拉取监控信息。
       没看到flume有一段时间内的统计信息。所有统计信息都是统计整个组件生命周期的。
DSC0005.jpg


       以上就是Flume源码剖析的全部内容。Flume本身提供了非常多的组件之外还有非常好的可扩展性。就整体架构而言还是非常清晰的。除了配置文件结构比较复杂,其他代码的可读性还是相当高的

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144350-1-1.html 上篇帖子: 日志收集之flume-ng源码分析 下篇帖子: flume-ng收集windows日志笔记
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表