|
项目需要C++代码与flume对接,进而将日志写入HDFS。
flume原生为java代码,原先的解决方案是通过JNI调用flume java方法。
但是由于一来对jni的调用效率的担心,二来C++调用JNI需要照顾local reference和GC的问题,被搞得头痛了。
一怒之下,重写代码了,使用C++与远端的JAVA Flume对接。
在协议的选择上,AVRO C++虽然也有apache的开源项目,但是目前只支持读写文件,而不能使用RPC。
故使用了thrift与远端Flume thrift source通信。
以下是一些实现的具体方法:
Flume thrift 协议准备:
1. 安装thrift,安装flume。
2. 下载flume的source包,解压。
3. 在解压目录flume-ng-sdk/src/main/thrift下,存在文件flume.thrift。
这是flume的thrift协议规则文件,执行命令:thrift -r --gen cpp ./flume.thrift
在./gen-cpp/目录下会生成一些.h .cpp文件,之后C++代码中需要调用其中的方法进行RPC通信。
注意:如果是在windows/visual studio下开发,由于ERROR与关键词冲突,需要对flume.thrift修改。
enum Status {
OK,
FAILED,
FERROR, //原先是ERROR,修改为FERROR。
UNKNOWN
}
如果是linux下工作,则无需改动任何代码。
C++ 代码实现:
1. include一些头文件,以及使用thrift的名字空间
1 #include <thrift/protocol/TBinaryProtocol.h>
2 #include <thrift/protocol/TCompactProtocol.h>
3 #include <thrift/transport/TSocket.h>
4 #include <thrift/transport/TTransportUtils.h>
5
6 using namespace std;
7 using namespace apache::thrift;
8 using namespace apache::thrift::protocol;
9 using namespace apache::thrift::transport;
2. 创建thrift的socket, transport, protocol, client.
1 class ThriftClient{
2 private:
3 /* Thrift protocol needings... */
4 boost::shared_ptr<TTransport> socket;
5 boost::shared_ptr<TTransport> transport;
6 boost::shared_ptr<TProtocol> protocol;
7 ThriftSourceProtocolClient* pClient;
8 }
1 ThriftClient::ThriftClient(std::string inIpAddress, std::string inPort):
2 socket(new TSocket(inIpAddress.c_str(), atoi(inPort.c_str()))),
3 transport(new TFramedTransport(socket)),
4 protocol(new TCompactProtocol(transport))
5 {
6 pClient = new ThriftSourceProtocolClient(protocol);
7 }
3. 与远端flume thrift source通信:
1 bool ThriftClient::sendEvent(const Event* event)
2 {
3 //build the head
4 std::map<std::string, std::string> headers;
5 std::ostringstream timeBuffer;
6 timeBuffer << event->timestamp <<"000";
7 headers.insert(std::make_pair("timestamp", timeBuffer.str()));
8 headers.insert(std::make_pair("appId", appId));
9 //build the body
10 std::ostringstream osBody;
11 osBody << *event;
12 std::string sBody = osBody.str();
13
14 ThriftFlumeEvent tfEvent;
15 tfEvent.__set_body(sBody);
16 tfEvent.__set_headers(headers);
17 if(!transport->isOpen())
18 {
19 transport->open();
20 }
21 Status::type res=pClient->append(tfEvent);
22 if(res == Status::OK)
23 {
24 return true;
25 }
26 else
27 {
28 printf("WARNING: send event via thrift failed, return code:%d\n",res);
29 return false;
30 }
31 }
其他注意点:
1. append方法用来发送一条event:
Status::type ThriftSourceProtocolClient::append(const ThriftFlumeEvent& event)
同时还有一个方法appendBatch用来一次发送多个event:
Status::type ThriftSourceProtocolClient::appendBatch(const std::vector<ThriftFlumeEvent> & events)
2. 无论是append还是appendBatch方法,都是阻塞方法。
3. 可能可以通过send_append,send_appendBatch来发送无需确认成功的event(未测试)。
void ThriftSourceProtocolClient::send_append(const ThriftFlumeEvent& event)
void ThriftSourceProtocolClient::send_appendBatch(const std::vector<ThriftFlumeEvent> & events)
4. Thrift只提供协议RPC功能,并没有提供flume的channel功能,以及多个source的情况下的load balance功能。这些都需要自己实现。
关于性能:
测试环境: vmware+ubuntu,i3-4150 CPU, 配置1G内存,双核CPU。
在本机配置两个flume thrift source(load balance),网络回环。
在每条event大约50字符的情况下,可以达到16000条每秒的吞吐量,此时CPU被耗尽(两个flume thrift source大约占用30%CPU)。
|
|
|