设为首页 收藏本站
查看: 1374|回复: 0

[经验分享] flume sink运行过程简单分析

[复制链接]

尚未签到

发表于 2015-9-17 07:49:50 | 显示全部楼层 |阅读模式
  没有运行,直接看源码得到sink简单运行过程
  SinkRunner负责运行sink程序
  内部类
  PollingRunner implements Runnable
  {
  private SinkProcessor policy;
  }
  负责运行sink
  run方法
  
  while (!shouldStop.get()) {
          try {
              if (policy.process().equals(Sink.Status.BACKOFF)) {
                  counterGroup.incrementAndGet("runner.backoffs");
  Thread.sleep(Math.min(counterGroup.incrementAndGet("runner.backoffs.consecutive")* backoffSleepIncrement, maxBackoffSleep));
              } else {
                  counterGroup.set("runner.backoffs.consecutive", 0L);
              }
          } catch (InterruptedException e) {
                logger.debug("Interrupted while processing an event. Exiting.");
                counterGroup.incrementAndGet("runner.interruptions");
          } catch (Exception e) {
                logger.error("Unable to deliver event. Exception follows.", e);
                if (e instanceof EventDeliveryException) {
                    counterGroup.incrementAndGet("runner.deliveryErrors");
                } else {
                    counterGroup.incrementAndGet("runner.errors");
                }
            try {
                  Thread.sleep(maxBackoffSleep);
              } catch (InterruptedException ex) {
                  Thread.currentThread().interrupt();
              }
            }
        }
  

  
  policy 对应具体的sink处理器,这里以FailoverSinkProcessor举例子
  这里面,针对FailoverSinkProcessor可以参照 http://blog.iyunv.com/simonchi/article/details/42520193讲解,这里大致说下便可
  
  configure方法
  liveSinks = new TreeMap<Integer, Sink>();
    failedSinks = new PriorityQueue<FailedSink>();
  从配置文件中定义的sinks中遍历每一个sink,获得其优先级,然后放到liveSinks中,无论sink是否可用。
  最后,activeSink = liveSinks.get(liveSinks.lastKey());,从liveSinks按照key排序,获得最后一个key(优先级,最大)对应的sink初始化 activeSink
  
  
  policy.process().equals(Sink.Status.BACKOFF))执行的是FailoverSinkProcessor的process()方法
  process()方法
  首先一个while循环,遍历所有的failedSinks ,拿出每一个failed的sink,如果拿出来的failed sink能够访问了,则把他付给activeSink ,并return sink.process()的状态。在轮询的过程中,如果failed sink还是不能到达,则重新放入到failedSinks 中并刷新时间,否则,如果能够联通,但是状态不是READY,也放入到failedSinks 中且不刷新。
  
  之后,是对activeSink进行while循环,调用activeSink中的每一个sink.proccess().调用成功,则return状态。否则,出现异常,将当前active的sink移动到failedSinks 中,同时获得下一个active的sink从activeSink中。继续while判断
  
  函数的最后是一个异常,即没有任何一个sink可用。
  
  sink.process()是啥?是从channel中拿出数据的。
  这里以NullSink为例
  根据事务和batchsize从chanel中拿出数据来,并写入到相应的位置
  public Status process() throws EventDeliveryException {
        Status status = Status.READY;
  Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event = null;
        long eventCounter = counterGroup.get("events.success");
  try {
              transaction.begin();
              int i = 0;
              for (i = 0; i < batchSize; i++) {
                  event = channel.take();
                  if (++eventCounter % logEveryNEvents == 0) {
                      logger.info("Null sink {} successful processed {} events.", getName(), eventCounter);
                  }
                  if(event == null) {
                        status = Status.BACKOFF;
                        break;
                  }
              }
              transaction.commit();
              counterGroup.addAndGet("events.success", (long) Math.min(batchSize, i));
              counterGroup.incrementAndGet("transaction.success");
         } catch (Exception ex) {
              transaction.rollback();
              counterGroup.incrementAndGet("transaction.failed");
              logger.error("Failed to deliver event. Exception follows.", ex);
              throw new EventDeliveryException("Failed to deliver event: " + event, ex);
        } finally {
              transaction.close();
        }
  return status;
    }
  

  
  SinkProcessor之LoadBalancingSinkProcessor
  同样也有configure、process方法,只不过内部逻辑不同,要实现loadbalance功能。
  configure()方法主要是根据用户的设置,初始化selector,SELECTOR_NAME_ROUND_ROBIN, SELECTOR_NAME_RANDOM或者使用定义的CONFIG_SELECTOR类(SinkSelector子类)
  



if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) {
selector = new RoundRobinSinkSelector(shouldBackOff);
} else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
selector = new RandomOrderSinkSelector(shouldBackOff);
} else {
try {
@SuppressWarnings("unchecked")
Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>)
Class.forName(selectorTypeName);
selector = klass.newInstance();
} catch (Exception ex) {
throw new FlumeException("Unable to instantiate sink selector: "
+ selectorTypeName, ex);
}
}
  具体这两个selector怎么搞的,这里不讲,可以参考: http://blog.iyunv.com/simonchi/article/details/42644651
  
  process()方法就是使用selector获得一个sink,调用其process方法,成功则返回status
  



@Override
public Status process() throws EventDeliveryException {
Status status = null;
Iterator<Sink> sinkIterator = selector.createSinkIterator();
while (sinkIterator.hasNext()) {
Sink sink = sinkIterator.next();
try {
status = sink.process();
break;
} catch (Exception ex) {
selector.informSinkFailed(sink);
LOGGER.warn("Sink failed to consume event. "
+ "Attempting next sink if available.", ex);
}
}
if (status == null) {
throw new EventDeliveryException("All configured sinks have failed");
}
return status;
}
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-114596-1-1.html 上篇帖子: Flume 1.4.0 User Guide 下篇帖子: 通过flume-ng收集日志
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表