jinying8869 发表于 2017-5-21 13:25:30

flume-Thrift-source

Thrift IDL
Flume Thrift IDL在client包里面,定义如下:

namespace java org.apache.flume.thrift

struct ThriftFlumeEvent {
1: required map <string, string> headers,
2: required binary body,
}

enum Status {
OK,
FAILED,
ERROR,
UNKNOWN
}

service ThriftSourceProtocol {
Status append(1: ThriftFlumeEvent event),
Status appendBatch(1: list<ThriftFlumeEvent> events),
}注意:event在C#里面是关键字,所以利用Thrift编译器生成客户端的接口时,要把所有event关键字改成其他的、比如events.

Thrift Service
Flume的Source分两种:

?实现PollableSource接口
通过SinkRunner管理Source
?实现EventDrivenSource接口
可以自己接受数据、发送到channel。比如ThriftSource
Flume Thrift Service的实现类在core包

public class ThriftSource extends AbstractSource implements Configurable,
EventDrivenSource {
public static final String CONFIG_THREADS = "threads";
public static final String CONFIG_BIND = "bind";
public static final String CONFIG_PORT = "port";
private Integer port;
private String bindAddress;
private int maxThreads = 0;
private SourceCounter sourceCounter;
private TServer server;
private TServerTransport serverTransport;
private ExecutorService servingExecutor;
public void start() {
      //创建工作者线程池
      ...
      args.protocolFactory(new TCompactProtocol.Factory());
      args.inputTransportFactory(new TFastFramedTransport.Factory());
      args.outputTransportFactory(new TFastFramedTransport.Factory());

      //ThriftSourceProtocol是Flume Thrift Service的真正实现
      args.processor(new ThriftSourceProtocol
      .Processor<ThriftSourceHandler>(new ThriftSourceHandler()));
   /**
   * Start serving.
   */
    servingExecutor.submit(new Runnable() {
      @Override
      public void run() {
      server.serve();
      }
    });
    ...
}Flume Thrift Service真正的实现类是内部类ThriftSourceHandler

private class ThriftSourceHandler implements ThriftSourceProtocol.Iface {

    @Override
    public Status append(ThriftFlumeEvent event) throws TException {
      Event flumeEvent = EventBuilder.withBody(event.getBody(),
      event.getHeaders());

      sourceCounter.incrementAppendReceivedCount();
      sourceCounter.incrementEventReceivedCount();

      try {
      //传给channel
      getChannelProcessor().processEvent(flumeEvent);
      } catch (ChannelException ex) {
      logger.warn("Thrift source " + getName() + " could not append events " +
          "to the channel.", ex);
      return Status.FAILED;
      }
      sourceCounter.incrementAppendAcceptedCount();
      sourceCounter.incrementEventAcceptedCount();
      return Status.OK;
    }

    @Override
    public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
      sourceCounter.incrementAppendBatchReceivedCount();
      sourceCounter.addToEventReceivedCount(events.size());

      List<Event> flumeEvents = Lists.newArrayList();
      for(ThriftFlumeEvent event : events) {
      flumeEvents.add(EventBuilder.withBody(event.getBody(),
          event.getHeaders()));
      }

      try {
      getChannelProcessor().processEventBatch(flumeEvents);
      } catch (ChannelException ex) {
      logger.warn("Thrift source %s could not append events to the " +
          "channel.", getName());
      return Status.FAILED;
      }

      sourceCounter.incrementAppendBatchAcceptedCount();
      sourceCounter.addToEventAcceptedCount(events.size());
      return Status.OK;
    }
}
页: [1]
查看完整版本: flume-Thrift-source