设为首页 收藏本站
查看: 1314|回复: 0

[经验分享] Flume数据传输事务分析[转]

[复制链接]

尚未签到

发表于 2015-9-17 07:47:23 | 显示全部楼层 |阅读模式
  本文基于ThriftSource,MemoryChannel,HdfsSink三个组件,对Flume数据传输的事务进行分析,如果使用的是其他组件,Flume事务具体的处理方式将会不同。一般情况下,用MemoryChannel就好了,我们公司用的就是这个,FileChannel速度慢,虽然提供日志级别的数据恢复,但是一般情况下,不断电MemoryChannel是不会丢数据的。
  Flume提供事物操作,保证用户的数据的可靠性,主要体现在:


  • 数据在传输到下个节点时(通常是批量数据),如果接收节点出现异常,比如网络异常,则回滚这一批数据。因此有可能导致数据重发
  •   同个节点内,Source写入数据到Channel,数据在一个批次内的数据出现异常,则不写入到Channel。已接收到的部分数据直接抛弃,靠上一个节点重发数据。
    DSC0000.jpg


编程模型
  Flume在对Channel进行Put和Take操作的时候,必须要用事物包住,比如:

Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
//事物开始
txn.begin();
try {
Event eventToStage = EventBuilder.withBody("Hello Flume!",
Charset.forName("UTF-8"));
//往临时缓冲区Put数据
ch.put(eventToStage);
//或者ch.take()
//将这些数据提交到channel中
txn.commit();
} catch (Throwable t) {
txn.rollback();

if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}

Put事务流程
  Put事务可以分为以下阶段:


  • doPut:将批数据先写入临时缓冲区putList
  • doCommit:检查channel内存队列是否足够合并。
  • doRollback:channel内存队列空间不足,抛弃数据
  我们从Source数据接收到写入Channel这个过程对Put事物进行分析。
DSC0001.jpg

  
ThriftSource会spawn多个Worker线程(ThriftSourceHandler)去处理数据,Worker处理数据的接口,我们只看batch批量处理这个接口:

    @Override
public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
List<Event> flumeEvents = Lists.newArrayList();
for(ThriftFlumeEvent event : events) {
flumeEvents.add(EventBuilder.withBody(event.getBody(),
event.getHeaders()));
}
//ChannelProcessor,在Source初始化的时候传进来.将数据写入对应的Channel
getChannelProcessor().processEventBatch(flumeEvents);
...
return Status.OK;
}

  事务逻辑都在processEventBatch这个方法里:

public void processEventBatch(List<Event> events) {
...
//预处理每行数据,有人用来做ETL嘛
events = interceptorChain.intercept(events);
...
//分类数据,划分不同的channel集合对应的数据
// Process required channels
Transaction tx = reqChannel.getTransaction();
...
//事务开始,tx即MemoryTransaction类实例
tx.begin();
List<Event> batch = reqChannelQueue.get(reqChannel);
for (Event event : batch) {
// 这个put操作实际调用的是transaction.doPut
reqChannel.put(event);
}
//提交,将数据写入Channel的队列中
tx.commit();
} catch (Throwable t) {
//回滚
tx.rollback();
...
}
}
...
}

  每个Worker线程都拥有一个Transaction实例,保存在Channel(BasicChannelSemantics)里的ThreadLocal变量currentTransaction.
  那么,事务到底做了什么?
DSC0002.jpg
  
实际上,Transaction实例包含两个双向阻塞队列LinkedBlockingDeque(感觉没必要用双向队列,每个线程写自己的putList,又不是多个线程?),分别为:


  • putList
  • takeList
  
对于Put事物操作,当然是只用到putList了。putList就是一个临时的缓冲区,数据会先put到putList,最后由commit方法会检查channel是否有足够的缓冲区,有则合并到channel的队列。
  
channel.put -> transaction.doPut:

    protected void doPut(Event event) throws InterruptedException {
//计算数据字节大小
int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
//写入临时缓冲区putList
if (!putList.offer(event)) {
throw new ChannelException(
"Put queue for MemoryTransaction of capacity " +
putList.size() + " full, consider committing more frequently, " +
"increasing capacity or increasing thread count");
}
putByteCounter += eventByteSize;
}

  transaction.commit:

@Override
protected void doCommit() throws InterruptedException {
//检查channel的队列剩余大小是否足够
...
int puts = putList.size();
...
synchronized(queueLock) {
if(puts > 0 ) {
while(!putList.isEmpty()) {
//写入到channel的队列
if(!queue.offer(putList.removeFirst())) {
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
//清除临时队列
putList.clear();
...
}
...
}

  如果在事务期间出现异常,比如channel剩余空间不足,则rollback:

@Override
protected void doRollback() {
...
//抛弃数据,没合并到channel的内存队列
putList.clear();
...
}

Take事务
  Take事务分为以下阶段:


  • doTake:先将数据取到临时缓冲区takeList
  • 将数据发送到下一个节点
  • doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
  • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列。
DSC0003.jpg

  
Sink其实是由SinkRunner线程调用Sink.process方法来了处理数据的。我们从HdfsEventSink的process方法说起,Sink类都有个process方法,用来处理传输数据的逻辑。:

public Status process() throws EventDeliveryException {
...
Transaction transaction = channel.getTransaction();
...
//事务开始
transaction.begin();
...
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
//take数据到临时缓冲区,实际调用的是transaction.doTake
Event event = channel.take();
if (event == null) {
break;
}
...
//写数据到HDFS
bucketWriter.append(event);
...
// flush all pending buckets before committing the transaction
for (BucketWriter bucketWriter : writers) {
bucketWriter.flush();
}
//commit
transaction.commit();
...
} catch (IOException eIO) {
transaction.rollback();
...
} finally {
transaction.close();
}
}

  大致流程图:
DSC0004.jpg

  
接着看看channel.take,作用是将数据放到临时缓冲区,实际调用的是transaction.doTake:

protected Event doTake() throws InterruptedException {
...
//从channel内存队列取数据
synchronized(queueLock) {
event = queue.poll();
}
...
//将数据放到临时缓冲区
takeList.put(event);
...
return event;
}

  接着,HDFS写线程bucketWriter将take到的数据写到HDFS,如果批数据都写完了,则要commit了:

protected void doCommit() throws InterruptedException {
...
takeList.clear();
...
}

  很简单,其实就是清空takeList而已。如果bucketWriter在写数据到HDFS的时候出现异常,则要rollback:

protected void doRollback() {
int takes = takeList.size();
//检查内存队列空间大小,是否足够takeList写回去
synchronized(queueLock) {
Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
"queue to rollback takes. This should never happen, please report");
while(!takeList.isEmpty()) {
queue.addFirst(takeList.removeLast());
}
...
}
...
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-114592-1-1.html 上篇帖子: flume-ng tmp 下篇帖子: Flume学习——Flume中事务的定义
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表