设为首页 收藏本站
查看: 2379|回复: 0

[经验分享] C++ Thrift Client 与 Flume Thrift Source 对接

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-9-17 06:29:25 | 显示全部楼层 |阅读模式
  项目需要C++代码与flume对接,进而将日志写入HDFS。
flume原生为java代码,原先的解决方案是通过JNI调用flume java方法。
但是由于一来对jni的调用效率的担心,二来C++调用JNI需要照顾local reference和GC的问题,被搞得头痛了。
一怒之下,重写代码了,使用C++与远端的JAVA Flume对接。
  在协议的选择上,AVRO C++虽然也有apache的开源项目,但是目前只支持读写文件,而不能使用RPC。
故使用了thrift与远端Flume thrift source通信。
以下是一些实现的具体方法:
  
  Flume thrift 协议准备:
  1. 安装thrift,安装flume。
2. 下载flume的source包,解压。
3. 在解压目录flume-ng-sdk/src/main/thrift下,存在文件flume.thrift。
    这是flume的thrift协议规则文件,执行命令:thrift -r --gen cpp ./flume.thrift
    在./gen-cpp/目录下会生成一些.h .cpp文件,之后C++代码中需要调用其中的方法进行RPC通信。
  注意:如果是在windows/visual studio下开发,由于ERROR与关键词冲突,需要对flume.thrift修改。
      enum Status {
    OK,
    FAILED,
    FERROR, //原先是ERROR,修改为FERROR。
    UNKNOWN
    }
      如果是linux下工作,则无需改动任何代码。
  
  C++ 代码实现:
  1. include一些头文件,以及使用thrift的名字空间



1 #include <thrift/protocol/TBinaryProtocol.h>
2 #include <thrift/protocol/TCompactProtocol.h>
3 #include <thrift/transport/TSocket.h>
4 #include <thrift/transport/TTransportUtils.h>
5
6 using namespace std;
7 using namespace apache::thrift;
8 using namespace apache::thrift::protocol;
9 using namespace apache::thrift::transport;
  2. 创建thrift的socket, transport, protocol, client.



1 class ThriftClient{
2     private:
3     /* Thrift protocol needings... */
4     boost::shared_ptr<TTransport> socket;
5     boost::shared_ptr<TTransport> transport;
6     boost::shared_ptr<TProtocol> protocol;
7     ThriftSourceProtocolClient* pClient;
8 }


1     ThriftClient::ThriftClient(std::string inIpAddress, std::string inPort):
2         socket(new TSocket(inIpAddress.c_str(), atoi(inPort.c_str()))),
3         transport(new TFramedTransport(socket)),
4         protocol(new TCompactProtocol(transport))
5     {
6         pClient = new ThriftSourceProtocolClient(protocol);
7     }
  
  3. 与远端flume thrift source通信:



1       bool ThriftClient::sendEvent(const Event* event)
2       {
3           //build the head
4           std::map<std::string, std::string>  headers;
5           std::ostringstream timeBuffer;
6           timeBuffer << event->timestamp <<"000";
7           headers.insert(std::make_pair("timestamp", timeBuffer.str()));
8           headers.insert(std::make_pair("appId", appId));
9           //build the body
10           std::ostringstream  osBody;
11           osBody << *event;
12           std::string sBody = osBody.str();
13  
14           ThriftFlumeEvent tfEvent;
15           tfEvent.__set_body(sBody);
16           tfEvent.__set_headers(headers);
17           if(!transport->isOpen())
18           {
19               transport->open();
20           }
21           Status::type res=pClient->append(tfEvent);
22           if(res == Status::OK)
23           {
24               return true;
25           }
26           else
27           {
28               printf("WARNING: send event via thrift failed, return code:%d\n",res);
29               return false;
30           }
31       }
  
  其他注意点:
  1. append方法用来发送一条event:



Status::type ThriftSourceProtocolClient::append(const ThriftFlumeEvent& event)
  同时还有一个方法appendBatch用来一次发送多个event:



Status::type ThriftSourceProtocolClient::appendBatch(const std::vector<ThriftFlumeEvent> & events)
  
  2. 无论是append还是appendBatch方法,都是阻塞方法。
  3. 可能可以通过send_append,send_appendBatch来发送无需确认成功的event(未测试)。



void ThriftSourceProtocolClient::send_append(const ThriftFlumeEvent& event)


void ThriftSourceProtocolClient::send_appendBatch(const std::vector<ThriftFlumeEvent> & events)
  4. Thrift只提供协议RPC功能,并没有提供flume的channel功能,以及多个source的情况下的load balance功能。这些都需要自己实现。
  
  关于性能:
  测试环境: vmware+ubuntu,i3-4150 CPU, 配置1G内存,双核CPU。
  在本机配置两个flume thrift source(load balance),网络回环。
  在每条event大约50字符的情况下,可以达到16000条每秒的吞吐量,此时CPU被耗尽(两个flume thrift source大约占用30%CPU)。
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-114551-1-1.html 上篇帖子: Flume-NG源码阅读之SinkGroups和SinkRunner 下篇帖子: flume与Mosquitto的集成
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表