o2geao 发表于 2015-11-27 16:57:28

Flume-ng ThriftSource原理分析


Thrift IDL

Flume Thrift IDL在client包里面,定义如下:

namespace java org.apache.flume.thrift
struct ThriftFlumeEvent {
1: required map <string, string> headers,
2: required binary body,
}
enum Status {
OK,
FAILED,
ERROR,
UNKNOWN
}
service ThriftSourceProtocol {
Status append(1: ThriftFlumeEvent event),
Status appendBatch(1: list<ThriftFlumeEvent> events),
}
注意:event在C#里面是关键字,所以利用Thrift编译器生成客户端的接口时,要把所有event关键字改成其他的、比如events.


Thrift Service

Flume的Source分两种:


[*]实现PollableSource接口

通过SinkRunner管理Source
[*]实现EventDrivenSource接口

可以自己接受数据、发送到channel。比如ThriftSource

Flume Thrift Service的实现类在core包

public class ThriftSource extends AbstractSource implements Configurable,
EventDrivenSource {
public static final String CONFIG_THREADS = &quot;threads&quot;;
public static final String CONFIG_BIND = &quot;bind&quot;;
public static final String CONFIG_PORT = &quot;port&quot;;
private Integer port;
private String bindAddress;
private int maxThreads = 0;
private SourceCounter sourceCounter;
private TServer server;
private TServerTransport serverTransport;
private ExecutorService servingExecutor;
public void start() {
//创建工作者线程池
...
args.protocolFactory(new TCompactProtocol.Factory());
args.inputTransportFactory(new TFastFramedTransport.Factory());
args.outputTransportFactory(new TFastFramedTransport.Factory());
//ThriftSourceProtocol是Flume Thrift Service的真正实现
args.processor(new ThriftSourceProtocol
.Processor<ThriftSourceHandler>(new ThriftSourceHandler()));
/**
* Start serving.
*/
servingExecutor.submit(new Runnable() {
@Override
public void run() {
server.serve();
}
});
...
}
Flume Thrift Service真正的实现类是内部类ThriftSourceHandler

private class ThriftSourceHandler implements ThriftSourceProtocol.Iface {
@Override
public Status append(ThriftFlumeEvent event) throws TException {
Event flumeEvent = EventBuilder.withBody(event.getBody(),
event.getHeaders());
sourceCounter.incrementAppendReceivedCount();
sourceCounter.incrementEventReceivedCount();
try {
//传给channel
getChannelProcessor().processEvent(flumeEvent);
} catch (ChannelException ex) {
logger.warn(&quot;Thrift source &quot; &#43; getName() &#43; &quot; could not append events &quot; &#43;
&quot;to the channel.&quot;, ex);
return Status.FAILED;
}
sourceCounter.incrementAppendAcceptedCount();
sourceCounter.incrementEventAcceptedCount();
return Status.OK;
}
@Override
public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
sourceCounter.incrementAppendBatchReceivedCount();
sourceCounter.addToEventReceivedCount(events.size());
List<Event> flumeEvents = Lists.newArrayList();
for(ThriftFlumeEvent event : events) {
flumeEvents.add(EventBuilder.withBody(event.getBody(),
event.getHeaders()));
}
try {
getChannelProcessor().processEventBatch(flumeEvents);
} catch (ChannelException ex) {
logger.warn(&quot;Thrift source %s could not append events to the &quot; &#43;
&quot;channel.&quot;, getName());
return Status.FAILED;
}
sourceCounter.incrementAppendBatchAcceptedCount();
sourceCounter.addToEventAcceptedCount(events.size());
return Status.OK;
}
}
页: [1]
查看完整版本: Flume-ng ThriftSource原理分析