设为首页 收藏本站
查看: 1349|回复: 0

[经验分享] flume-ng源码阅读memory-channel(原创)

[复制链接]

尚未签到

发表于 2015-9-16 13:32:25 | 显示全部楼层 |阅读模式
  org.apache.flume.channel.MemoryChannel类是Flume-NG的memory-channel。
private LinkedBlockingDeque<Event> queue;//mem-channel存放数据的地方

   private Semaphore queueRemaining;//queue存量信号量

  private Semaphore queueStored;//queue存量的信号量,保证channel里面有数据

  private Semaphore bytesRemaining;//剩余字节信号量,以100字节为一个单位,也是动态调整的
Flume-NG的组件(source、sink、channel)总是先通过configure(Context context)方法,获取配置文件中的配置信息,在这配置了mem的最大容量capacity、事务的event最大容量transCapacity、缓存百分比byteCapacityBufferPercentage、最大内存所有事件允许总字节数      byteCapacity。还有信号量的初始化:

synchronized(queueLock) {//初始化mem

        queue = new LinkedBlockingDeque<Event>(capacity);

        queueRemaining = new Semaphore(capacity);

        queueStored = new Semaphore(0);

}

以及:bytesRemaining = new Semaphore(byteCapacity);

queueStored这个比较特殊,初始为0表示开始时,queue没有数据,只要queue的大小有所调整时就需要调整这个信号量,增加就release,减少就tryAcquire。

当然在configure方法中可以看到如果配置文件修改后是如何动态修改的(flume默认每30s扫描加载一次配置文件)。

然后start()方法进行一些初始化操作。

resizeQueue(capacity)方法用来动态加载配置文件,调整mem容量的。

createTransaction()方法,返回MemoryTransaction实例。

estimateEventSize(Event event)方法,返回event.body的字节长度。


  该类有一个内部类MemoryTransaction是mem-channel从source取(put)数据、给(take)sink的操作类。其初始化时会创建两个LinkedBlockingDeque,一个是takeList用于sink的take;一个是putList用于source的put,两个队列的容量都是事务的event最大容量transCapacity。两个队列是用于事务回滚rollback和提交commit的。
  Source交给channel处理的一般是调用ChannelProcessor类的processEventBatch(List<Event> events)方法或者processEvent(Event event)方法;在sink端可以直接使用channel.take()方法获取其中的一条event数据。这俩方法在将event提交至channel时,都需要:
  一、获取channel列表。List<Channel> requiredChannels = selector.getRequiredChannels(event);
  二、通过channel获取Transaction。Transaction tx = reqChannel.getTransaction();
  三、tx.begin();
  四、reqChannel.put(event)(在sink中这是take(event)方法);
  五、tx.commit();
  六、tx.rollback();
  七、tx.close()。
  上面的三~七中的方法,最终调用的是MemoryTransaction的doBegin(未重写,默认空方法)、doPut、doCommit、doRollback、doClose(未重写,默认空方法)方法。
  其中doPut方法,先计算event的大小可以占用bytesRemaining的多大空间,然后在有限的时间内等待获取写入空间,获取之后写入putList暂存。
  doTake方法,先检查takeList的剩余容量;再检查是否有许可进行取操作(queueStored使得可以不用阻塞其它线程获取许可信息);然后同步的从queue中取一个event,再放入takeList,并返回此event。
  doCommit方法,不管在sink还是source端都会调用。首先检查queue队列中是否有足够空间放得下从source过来的数据,依据就是queueRemaining是否有remainingChange = takeList.size-putList.size个许可。然后是将putList中的所有数据提交到内存队列queue之中,并将putList和takeList清空。清空表明:运行到这步说明takeList中的数据无需再保留,putList中的数据可以放入queue中。由于在doTake中从queue取数据,所以queueStored在减,但在doCommit中会把putList中的数据放入queue所以需要增加queueStored:queueStored.release(puts);bytesRemaining在doPut中获得了一些许可会减少,在doCommit中由于takeList会清空所有会增加bytesRemaining:bytesRemaining.release(takeByteCounter);而queueRemaining在doPut和doTake中并未进行操作,而且doCommit方法在sink和source中都会调用,故而在此方法中修改takeList和putList的差值即可:queueRemaining.release(remainingChange)(在此有个细节,在doCommit的开始remainingChange如果小于0,说明剩余空间不足以放入整个putList,要么超时报错退出;要么获得足够的许可,如果是后者的话就不需要再调整queueRemaining因为是在现在的基础之上减,如果remainingChange大于0,说明去除takeList大小后不仅足以放入整个putList,而且还有剩余,queueRemaining需要释放remainingChange)。其他就是修改计数器。
  doRollback方法是在上面三、四、五出现异常的时候调用的,用于事务回滚。不管是在sink还是source中,都会调用。将takeList中的所有数据重新放回queue中:
while(!takeList.isEmpty()) {

queue.addFirst(takeList.removeLast());//回滚时,重新放回queue中。可能会重复(commit阶段出错,已经take的数据需要回滚,批量的情况)

  }
  然后清空putList:
  putList.clear(); //这个方法可能发生在put中,也可能发生在take中,所以需要同步清空。可能会丢数据(还在put的阶段,没到commit阶段,出错会导致回滚,导致已经put还未放入queue中的数据会丢失)
  由于putList清空了,所以bytesRemaining.release(putByteCounter);
  由于takeList又返回给了queue所以queue的量增加了:queueStored.release(takes)。
  
  在分层的分布式flume中,一旦汇总节点中断,而采集节点使用mem,则采集会大量的丢失数据,因为channel会因为put而快速的填满,填满之后再调用put会迸发异常,致使出现异常引起事务回滚,回滚会直接清空putList,使数据丢失,只留下channel中的数据(这些数据是一开始放入进去的后来的会丢失)。putList.offer会因为填满数据返回false,add方法如果队列满了则会爆异常。
  讲解并不一定完全正确,希望大伙踊跃交流。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-114531-1-1.html 上篇帖子: Flume-NG启动过程源码分析(三)(原创) 下篇帖子: flume 参考文档 (四)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表