设为首页 收藏本站
查看: 1380|回复: 0

[经验分享] Flume的Avro Sink和Avro Source研究之一: Avro Source

[复制链接]

尚未签到

发表于 2015-9-17 07:00:03 | 显示全部楼层 |阅读模式
问题 : Avro Source提供了怎么样RPC服务,是怎么提供的?

问题 1.1 Flume Source是如何启动一个Netty Server来提供RPC服务。
  由GitHub上avro-rpc-quickstart知道可以通过下面这种方式启动一个NettyServer,来提供特定的RPC。那么Flume Source 是通过这种方法来提供的RPC服务吗?



  server = new NettyServer(new SpecificResponder(Mail.class, new MailImpl()), new InetSocketAddress(65111));

  
  AvroSource中创建NettyServer的源码为:
  



    Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
NioServerSocketChannelFactory socketChannelFactory = initSocketChannelFactory();
ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory();
server = new NettyServer(responder, new InetSocketAddress(bindAddress, port),
socketChannelFactory, pipelineFactory, null);

  看来AvroSource也是直接用Avro提供的NettyServer类来建立了一个NettyServe,不过它使用了另一个构造函数,指定了ChannelFactory和ChannelPipelineFactory.
   那么AvroSource使用的是怎么样的一个ChannelFactory呢?
    initSocketChannelFactory()方法的实现为:



  private NioServerSocketChannelFactory initSocketChannelFactory() {
NioServerSocketChannelFactory socketChannelFactory;
if (maxThreads <= 0) {
socketChannelFactory = new NioServerSocketChannelFactory
(Executors .newCachedThreadPool(), Executors.newCachedThreadPool());
} else {
socketChannelFactory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newFixedThreadPool(maxThreads));
}
return socketChannelFactory;
}

  看来之所以要指定ChannelFactory,是为了根据AvroSource的"threads”这个参数,来决定可以使用worker thread的最大个数。这个数字决定了最多有多少个线程来处理RPC请求。
  参见NioServerChannelFactory的说明
  



A ServerSocketChannelFactory which creates a server-side NIO-based ServerSocketChannel. It utilizes the non-blocking I/O mode which was introduced with NIO to serve many number of concurrent connections efficiently.
How threads work
There are two types of threads in a NioServerSocketChannelFactory; one is boss thread and the other is worker thread.
Boss threads
Each bound ServerSocketChannel has its own boss thread. For example, if you opened two server ports such as 80 and 443, you will have two boss threads. A boss thread accepts incoming connections until the port is unbound. Once a connection is accepted successfully, the boss thread passes the accepted Channel to one of the worker threads that the NioServerSocketChannelFactory manages.
Worker threads
One NioServerSocketChannelFactory can have one or more worker threads. A worker thread performs non-blocking read and write for one or more Channels in a non-blocking mode.

   ChannelPipelineFactory是干嘛的呢?为什么也要特化一个?
    ChannelPipleline类的说明为:

  A list of ChannelHandlers which handles or intercepts ChannelEvents of a Channel. ChannelPipeline implements an advanced form of the Intercepting Filter pattern to give a user full control over how an event is handled and how the ChannelHandlers in the pipeline interact with each other.

  
  看来这东西提供了一种更高级的拦截器组合。那就来看看AvroSource是用了怎么样的ChannelPiplelineFactory
  



  private ChannelPipelineFactory initChannelPipelineFactory() {
ChannelPipelineFactory pipelineFactory;
boolean enableCompression = compressionType.equalsIgnoreCase("deflate");
if (enableCompression || enableSsl) {
pipelineFactory = new SSLCompressionChannelPipelineFactory(
enableCompression, enableSsl, keystore,
keystorePassword, keystoreType);
} else {
pipelineFactory = new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline();
}
};
}
return pipelineFactory;
}

  看来如果开启了压缩或者使用了ssl,就使用SSLCompressionChannelPiplelineFactory,这类是AvroSource一个私有的静态内部类。否则就使用Channels.pipleline()新建一个,这个pipleline貌似啥都不做?
  

问题 1.2这样Server是起来了,可是Server提供了什么样的RPC服务呢?
  关键在这一句。
  



Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);

  查下Avro的API,得知道SpecificResponder的两个参数是protocol和protocol的实现。看起来AvroSource这个类实现了AvroSourceProtocol。Yes, AvroSource的声明为
  



public class AvroSource extends AbstractSource implements EventDrivenSource,Configurable, AvroSourceProtocol

  那就看看AvroSourceProtocol是怎么样定义的吧。它定义在flume-ng-sdk工程的src/main/avro目录下,由flume.avdl定义。avdl是使用Avro IDL定义的协议。放在那个特定的目录下,是avro-maven-plugin的约定。
  这个avdl是这样的

  
  @namespace("org.apache.flume.source.avro")
  protocol AvroSourceProtocol {
  enum Status {
      OK, FAILED, UNKNOWN
  }
  record AvroFlumeEvent {
      map<string> headers;
      bytes body;
  }
  Status append( AvroFlumeEvent event );
  Status appendBatch( array<AvroFlumeEvent> events );
  }

  
  它定义了一个枚举,用作append和appendBatch的返回值。表示Source端对传输来的消息处理的结果,有OK FAILED UNKNOWN三种状态。
  定义了 AvroFlumeEvent这样一个record类型,符合Flume对Event的定义,header是一系列K-V对,即一个Map, body是byte数组。
  定义了两个方法,append单条AvroFlumeEvent,以及append一批AvroFlumeEvent.
  由此avdl,Avro生成了三个java文件,包括:一个枚举Status,一个类AvroFlumeEvent,一个接口AvroSourceProtocol。其中AvroSource类实现了AvroSourceProtocol接口,对外提供了append和appendBatch这两个远程方法调用。
  append方法实现为:
  



  @Override
public Status append(AvroFlumeEvent avroEvent) {
logger.debug("Avro source {}: Received avro event: {}", getName(),
avroEvent);
sourceCounter.incrementAppendReceivedCount();
sourceCounter.incrementEventReceivedCount();
Event event = EventBuilder.withBody(avroEvent.getBody().array(),
toStringMap(avroEvent.getHeaders()));
try {
getChannelProcessor().processEvent(event);
} catch (ChannelException ex) {
logger.warn("Avro source " + getName() + ": Unable to process event. " +
"Exception follows.", ex);
return Status.FAILED;
}
sourceCounter.incrementAppendAcceptedCount();
sourceCounter.incrementEventAcceptedCount();
return Status.OK;
}

  这个方法就是用获取的AvroFlumeEvent对象,经过转换构建一个Event对象。这个转换只是将不对等的数据类型进行了转换,arvoEvent.getBody()返回的是ByteBuffer,而avroEvent.getHeaders()返回的是Map<CharSequence,CharSequence>。
  构建完Event后,把这个消息传递给这个Source对应的ChannelProcessor来处理。
  appendBatch方法和append方法的实现很相似。
  
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-114554-1-1.html 上篇帖子: Flume Spooldir 源的一些问题 下篇帖子: flume-agent实例
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表