设为首页 收藏本站
查看: 1312|回复: 0

[经验分享] Flume 源码阅读

[复制链接]

尚未签到

发表于 2015-9-17 07:26:35 | 显示全部楼层 |阅读模式
  
  Flume架构
  主要由3个组件,分别是Source,Channel和Sink,3个组件组成Event在Flume中得数据流向或者说流水线,功能可以由Flume的介绍看出:When a Flume source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until it’s consumed by a Flume sink.
  源码中有对应的3个接口:



public interface Source extends LifecycleAware, NamedComponent {
/**
* Specifies which channel processor will handle this source's events.
*
* @param channelProcessor
*/
public void setChannelProcessor(ChannelProcessor channelProcessor);
/**
* Returns the channel processor that will handle this source's events.
*/
public ChannelProcessor getChannelProcessor();
}
  从Source接口看,Source并没有与定义与Event有关的接口,它的接口只是对ChannelProcessor的get和set方法。Source通过获取对应的ChannelProcessor来完成消息的投递。



public interface Sink extends LifecycleAware, NamedComponent {
public void setChannel(Channel channel);
public Channel getChannel();
public Status process() throws EventDeliveryException;
public static enum Status {
READY, BACKOFF
}
}
  Sink接口,也有与Channel有关的set和get方法,且是直接对应Channel,而不是Source那样对应ChannelProcessor。ChannelProcessor其实是位于Source和Channel之间的东西,相当于一个Event分发器,实现Source的mapping、replicating,optional机制(需要进一步看看代码)。 Sink之所以直接对应Channel,是因为任何一个Sink只从唯一的Channel中消费数据并发送到目标端。



public class ChannelProcessor implements Configurable {
private final ChannelSelector selector;
private final InterceptorChain interceptorChain;
public ChannelProcessor(ChannelSelector selector) {
this.selector = selector;
this.interceptorChain = new InterceptorChain();
}
public void initialize() {
interceptorChain.initialize();
}
public void close() {
interceptorChain.close();
}
/**
* The Context of the associated Source is passed.
* @param context
*/
@Override
public void configure(Context context) {
configureInterceptors(context);
}
public ChannelSelector getSelector() {
return selector;
}
public void processEventBatch(List<Event> events) {
。。。
}
public void processEvent(Event event) {
。。。。
}
  从ChannelProcessor简略代码看主要就是实现mapping、replicating和optional机制的数据分发,以及拦截机制。
  再看下Channel的接口源码:



public interface Channel extends LifecycleAware, NamedComponent {
public void put(Event event) throws ChannelException;
public Event take() throws ChannelException;
public Transaction getTransaction();
}
  再看看官方描述:
  A channel connects a Source to a Sink. The source acts as producer while the sink acts as a consumer of events. The channel itself is the buffer between the two.
  A channel exposes a Transaction interface that can be used by its clients to ensure atomic put and take semantics. This is necessary to guarantee single hop reliability between agents. For instance, a source will successfully produce an event if and only if that event can be committed to the source's associated channel. Similarly, a sink will consume an event if and only if its respective endpoint can accept the event. The extent of transaction support varies for different channel implementations ranging from strong to best-effort semantics.
  Flume uses a transactional approach to guarantee the reliable delivery of the events. The sources and sinks encapsulate in a transaction the storage/retrieval, respectively, of the events placed in or provided by a transaction provided by the channel. This ensures that the set of events are reliably passed from point to point in the flow. In the case of a multi-hop flow, the sink from the previous hop and the source from the next hop both have their transactions running to ensure that the data is safely stored in the channel of the next hop.
  从以上看出,Channel主要是存储Event,Event的存和取操作由Transaction控制。 后面再重点学习下Transaction
  
  Transaction
  Flume中的事务用来保证消息的可靠传递。
  同样也先看下官方注释中关于Transaction的使用例子和介绍:



org.apache.flume.Transaction
Provides the transaction boundary while accessing a channel
A Transaction instance is used to encompass channel access via the following idiom:
Channel ch = ...
Transaction tx = ch.getTransaction();
try {
tx.begin();
...
// ch.put(event) or ch.take()
   ...
tx.commit();
} catch (ChannelException ex) {
tx.rollback();
...
} finally {
tx.close();
}
Depending upon the implementation of the channel, the transaction semantics may be strong, or best-effort only.
Transactions must be thread safe. To provide a guarantee of thread safe access to Transactions, see BasicChannelSemantics and BasicTransactionSemantics.
  使用继承来自BasicChannelSemantics的Channel时,Flume强制在操作Channel时采用特定的程序结构,并且强制channel实现特定的方法以使得Channel本身可以应对存入或取出失败的情况,并且使得channel的使用者有可能根据操作是否成功采取适当的方法。
  Channel好比是数据库,而事务类就好比为数据库事务,于是Flume通过事务来确保Source和Sink采用特定的方式访问Channel,以保证Channel状态的一致性。例如当一个事务中需要把一个batch的event全放入Channel时,需要确保这个操作是原子的,要不全放进去,要不一个不放。
  MemoryChannel实现的关系图:
DSC0000.jpg
  AbstractChannel主要实现了NamedComponent、LifecycleAware和Configurable这几个基本的接口,还并没有与事务有关。
  BasicChannelSemantics实现在在local-thread中保存一个BasicTransactionSemantics对象的功能。它对Channel接口中take和put方法的实现为:确保当前的线程中有Transaction的一个可用的实例,然后把take和put代理给本线程transaction对象的同名方法。
  BasicTransactionSemantics确保了事务相关的操作只有按正确的顺序执行才可以。即tx.begin =》 channel.take/put =》 tx.commit =》  tx.close。它只保证了对Channel操作的顺序,由子类实现doBegin, doTake, doPut, doCommit, doRollback, doClose等方法。
  因此BasicChannelSemantic类和BasicTransactionSemantics类一起保证了操作Channel的逻辑。提供了所有Channel的父类。事务中的各个操作的语义,则由BasicTransactionSemantics的子类去实现,即它的子类来说明事务开始时干嘛,事务回滚时干嘛,取出消息时干嘛、放入消息时干嘛等等。
  
  BasicTransactionSemantic类
  BasicChannelSemantics实现了基础的Channel语法,包括了Transaction的thread-local语法。每个线程都包括了一个唯一的Transaction对象,保证了事务的隔离性:

private ThreadLocal<BasicTransactionSemantics> currentTransaction
= new ThreadLocal<BasicTransactionSemantics>();
  在线程中获取当前线程中的事务通过getTransaction方法,它会调用BasicChannelSemantics中定义的的抽象方法createTransaction()来获取BasicTransactionSemantics的实例。



/**
* <p>
* Initializes the channel if it is not already, then checks to see
* if there is an open transaction for this thread, creating a new
* one via <code>createTransaction</code> if not.
* @return the current <code>Transaction</code> object for the
*     calling thread
* </p>
*/
@Override
public Transaction getTransaction() {
if (!initialized) {
synchronized (this) {
if (!initialized) {
initialize();
initialized = true;
}
}
}
BasicTransactionSemantics transaction = currentTransaction.get();
if (transaction == null || transaction.getState().equals(
BasicTransactionSemantics.State.CLOSED)) {
transaction = createTransaction();
currentTransaction.set(transaction);
}
return transaction;
}



@Override
public void put(Event event) throws ChannelException {
BasicTransactionSemantics transaction = currentTransaction.get();
Preconditions.checkState(transaction != null,
"No transaction exists for this thread");
transaction.put(event);
}
@Override
public Event take() throws ChannelException {
BasicTransactionSemantics transaction = currentTransaction.get();
Preconditions.checkState(transaction != null,
"No transaction exists for this thread");
return transaction.take();
}
  真正的存取Event,是由其代理的BaseTransactionSemantics对象完成。
  
  BaseTransactionSemantics类
  也先看官方描述:


org.apache.flume.channel.BasicTransactionSemantics  An implementation of basic Transaction semantics designed to work in concert with BasicChannelSemantics to simplify creation of robust Channel implementations. This class ensures that each transaction implementation method is called only while the transaction is in the correct state for that method, and only by the thread that created the transaction. Nested calls to begin()and close() are supported as long as they are balanced.
  Subclasses need only implement doPut, doTake, doCommit, and doRollback, and the developer can rest assured that those methods are called only after transaction state preconditions have been properly met. doBegin and doClose may also be implemented if there is work to be done at those points.
  All InterruptedException exceptions thrown from the implementations of the doXXX methods are automatically wrapped to become ChannelExceptions, but only after restoring the interrupted status of the thread so that any subsequent blocking method calls will themselves throw InterruptedException rather than blocking. The exception to this rule is doTake, which simply returns null instead of wrapping and propagating the InterruptedException, though it still first restores the interrupted status of the thread.
  主要成员函数有:begin,commit,close, doBegin,doCommit,doClose,doPut,doTake,doRollback,getState,toString,put,take,rollback
  其中put take begin close commit rollback 的结构都很相似。主要结构都是确保这些操作时Transaction在正确的对应状态,然后调用doXXX方法。如果当前的线程不拥有当前的事务或者事务的状态不对,就抛出异常。如果doXXX方法抛出InterruptedException就通过Thread.currentThread.interrupt()方法恢复当前线程的interrupted状态,然后将捕获的InterruptedException包装成一个ChannelException抛出:



@Override
public void rollback() {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"rollback() called from different thread than getTransaction()!");
Preconditions.checkState(state.equals(State.OPEN),
"rollback() called when transaction is %s!", state);
state = State.COMPLETED;
try {
doRollback();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ChannelException(e.toString(), e);
}
}
  之前说过BasicChannelSemantic类对Channel接口定义的put和take的实现是代理给了自己线程中的BasicTransactionSemantic的put和get方法。而BasicTransactionSemantic中这两个方法,可以看看take实现:



  protected Event take() {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"take() called from different thread than getTransaction()!");
Preconditions.checkState(state.equals(State.OPEN),
"take() called when transaction is %s!", state);
try {
return doTake();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
  从代码中显然看到需要子类要实现的doTake和doPut方法。因此一个Channel的具体实现,其消息的处理逻辑是由自己的getTransaction()方法返回的Trasaction对象来实现的。具体可以再阅读MemoryChannel的内部类MemoryTransaction代码。
  
  启动过程:
  阅读非动态加载配置文件非zookeeper方式,主要是Application.java类中的handleConfiguration函数。
  PropertiesFileConfigurationProvider 继承自AbstractConfigurationProvider,其中提供getConfiguration 方法,该方法返回MasterializedConfiguration (实际上是派生类实例 SimpleMaterializedConfiguration),从而得到channel 实例的map,sink和source 各个实例组成的map。
  Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
  
  AbstractConfigurationProvider 类里的load****方法 调用各个factory的create方法,对各个chan,sink,source进行实例化。
  
注意,sink和source 的map是以****runner为值类型的。以sink为例,sink实例并不是实现了什么run方法以多线程,而是在AbstractConfiguration里的loadsinks 创建所有sink实例,然后在loadSinkGroups 创建每个SinkRunner实例(与每个sinkprocessor关联),以及sinkprocessor与一个sink或者sinkgroup关联。
  
  所以在Application.java中startAllComponents中对sink和source来说,是直接调用SinkRunner的start方法,该方法启动了一个PollingRunner线程,不断调用sinkprocessor的process方法。
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-114574-1-1.html 上篇帖子: flume-ng 使用spool source 传输文件到hdfs 下篇帖子: 让你系统认识flume及安装和使用flume1.5传输数据到hadoop2.2
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表