设为首页 收藏本站
查看: 1487|回复: 0

[经验分享] Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志

[复制链接]

尚未签到

发表于 2015-11-27 18:13:18 | 显示全部楼层 |阅读模式
  上一篇说了利用ExecSource从本地日志文件异步的收集日志,这篇说说采用RPC方式同步收集日志的方式。笔者对Thrift比较熟悉,所以用ThriftSource来介绍RPC的日志收集方式。
  整体的结构图如下:
  1. ThriftSource包含了一个Thrift Server,以及一个Thrift Service服务的实现。这里的Thrift Service是由ThriftSourceProtocol定义


  2. 应用程序调用Thrift Service的客户端,以RPC的方式将日志发送到ThriftSource的Thrfit Service的服务端,完成应用层序日志的收集


DSC0000.jpg


  先来看下ThriftSourceProtocol定义的Thrfit服务。Thrift服务定义在flume-ng-sdk工程的flume.thrift中
  1. 定义了ThriftFlumeEvent数据结构,日志封装成Event来Flume NG中传递
  2. 定义了ThriftSourceProtocol服务,有两个接口,append和appendBatch


  namespace java org.apache.flume.thrift
struct ThriftFlumeEvent {
1: required map <string, string> headers,
2: required binary body,
}
enum Status {
OK,
FAILED,
ERROR,
UNKNOWN
}
service ThriftSourceProtocol {
Status append(1: ThriftFlumeEvent event),
Status appendBatch(1: list<ThriftFlumeEvent> events),
}


Thrift生成的中间文件是ThrfitSourceProtocol,由服务器端和客户端共享
  public class ThriftSourceProtocol {
public interface Iface {
public Status append(ThriftFlumeEvent event) throws org.apache.thrift.TException;
public Status appendBatch(List<ThriftFlumeEvent> events) throws org.apache.thrift.TException;
}
。。。。。。
}

ThrfitSource中的ThriftSourceHandler私有类实现了ThrfitSourceProtocol这个服务,append接口的实现逻辑如下
  1. 把ThriftFlumeEvent转化成SimpleEvent
  2. 修改计数器
  3. 把SimpleEvent交给ChannelProcessor来处理,传递到下游的Channel中去
  


  可以看到ThriftSouceHandler的实现逻辑和ExecRunnable的逻辑基本是一样的


  private class ThriftSourceHandler implements ThriftSourceProtocol.Iface {
@Override
public Status append(ThriftFlumeEvent event) throws TException {
Event flumeEvent = EventBuilder.withBody(event.getBody(),
event.getHeaders());
sourceCounter.incrementAppendReceivedCount();
sourceCounter.incrementEventReceivedCount();
try {
getChannelProcessor().processEvent(flumeEvent);
} catch (ChannelException ex) {
logger.warn(&quot;Thrift source &quot; + getName() + &quot; could not append events &quot; +
&quot;to the channel.&quot;, ex);
return Status.FAILED;
}
sourceCounter.incrementAppendAcceptedCount();
sourceCounter.incrementEventAcceptedCount();
return Status.OK;
}
@Override
public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
sourceCounter.incrementAppendBatchReceivedCount();
sourceCounter.addToEventReceivedCount(events.size());
List<Event> flumeEvents = Lists.newArrayList();
for(ThriftFlumeEvent event : events) {
flumeEvents.add(EventBuilder.withBody(event.getBody(),
event.getHeaders()));
}
try {
getChannelProcessor().processEventBatch(flumeEvents);
} catch (ChannelException ex) {
logger.warn(&quot;Thrift source %s could not append events to the &quot; +
&quot;channel.&quot;, getName());
return Status.FAILED;
}
sourceCounter.incrementAppendBatchAcceptedCount();
sourceCounter.addToEventAcceptedCount(events.size());
return Status.OK;
}
}
}
  

有了Thrfit服务实现后,ThrfitSource定义了Thrfit Server。默认是TThreadedSelectorServer,当TThreadedSelectorServer ClassNotFound后,创建TThreadPoolServer,还是没找到的话,那么ThriftSource启动失败。
  关于Thrfit Server的更多介绍可以看这篇Thrift源码分析(七)-- TServer服务器分析
  

Thrift Server的创建和启动主要做了几件事情
  1. 创建ServerSocket,这里是TNonblockingServerSocket,非阻塞的ServerSocket
  2. 创建服务器参数类TNonblockingServer.AbstractNonblockingServerArgs,所有的服务器的属性设置都是在Args类里传递的
  3. TThreadedSelectorServer是一个Reactor模式的服务器实现,需要传递一个线程池。这里是Executors.newFixedThreadPool(maxThreads, threadFactory);
  4. 设置编解码协议,这里是TFastFramedTransport协议
  5. 设置Thrift服务的实现类Processor,这里是上面定义的ThrfitSourceHandler类
  6. 启动Thrift服务器,这里在单独的线程中启动了Thrift服务器。servingExecutor.submit(new Runnable() {public void run() {server.serve();}})
  在单独的线程启动Thrift服务器主要的目的是在原来的线程中可以处理一下Thrfit服务器停止后的清理工作。


  Class<?> serverClass = null;
Class<?> argsClass = null;
TServer.AbstractServerArgs args = null;
/*
* Use reflection to determine if TThreadedSelectServer is available. If
* it is not available, use TThreadPoolServer
*/
try {
serverClass = Class.forName(&quot;org.apache.thrift&quot; +
&quot;.server.TThreadedSelectorServer&quot;);
argsClass = Class.forName(&quot;org.apache.thrift&quot; +
&quot;.server.TThreadedSelectorServer$Args&quot;);
// Looks like TThreadedSelectorServer is available, so continue..
ExecutorService sourceService;
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
&quot;Flume Thrift IPC Thread %d&quot;).build();
if (maxThreads == 0) {
sourceService = Executors.newCachedThreadPool(threadFactory);
} else {
sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory);
}
serverTransport = new TNonblockingServerSocket(
new InetSocketAddress(bindAddress, port));
args = (TNonblockingServer.AbstractNonblockingServerArgs) argsClass
.getConstructor(TNonblockingServerTransport.class)
.newInstance(serverTransport);
Method m = argsClass.getDeclaredMethod(&quot;executorService&quot;,
ExecutorService.class);
m.invoke(args, sourceService);
}

try {
      args.protocolFactory(new TCompactProtocol.Factory());
      args.inputTransportFactory(new TFastFramedTransport.Factory());
      args.outputTransportFactory(new TFastFramedTransport.Factory());
      args.processor(new ThriftSourceProtocol
        .Processor<ThriftSourceHandler>(new ThriftSourceHandler()));
     
      server = (TServer) serverClass.getConstructor(argsClass).newInstance
        (args);
    } catch (Throwable ex) {
      throw new FlumeException(&quot;Cannot start Thrift Source.&quot;, ex);
    }

    servingExecutor = Executors.newSingleThreadExecutor(new
      ThreadFactoryBuilder().setNameFormat(&quot;Flume Thrift Source I/O Boss&quot;)
      .build());
    /**
     * Start serving.
     */
    servingExecutor.submit(new Runnable() {
      @Override
      public void run() {
        server.serve();
      }
    });





总结一下,采用RPC的方式收集日志有几个步骤
  1. 定义RPC服务来收集日志
  2. 实现RPC服务,并提供客户端给应用程序。应用程序使用客户端来将日志封装成Event,通过RPC调用传递给RPC类型的Source
  3. RPC类型的Source启动RPC Server,提供RPC服务,将接收到的Event传递给下游的Channel
  


  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144308-1-1.html 上篇帖子: Flume 收集Nginx日志到Hdfs Tail-to-hdfs sink 下篇帖子: flume 案例
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表