设为首页 收藏本站
查看: 2282|回复: 0

[经验分享] Apache Kafka系列(五) Kafka Connect及FileConnector示例

[复制链接]

尚未签到

发表于 2017-12-24 09:41:44 | 显示全部楼层 |阅读模式

  • Apache Kafka系列(一) 起步
  • Apache Kafka系列(二) 命令行工具(CLI)
  • Apache Kafka系列(三) Java API使用
  • Apache Kafka系列(四) 多线程Consumer方案
  • Apache Kafka系列(五) Kafka Connect及FileConnector示例
一. Kafka Connect简介
  Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为何集成其他系统和解耦应用,经常使用Producer来发送消息到Broker,并使用Consumer来消费Broker中的消息。Kafka Connect是到0.9版本才提供的并极大的简化了其他系统与Kafka的集成。Kafka Connect运用用户快速定义并实现各种Connector(File,Jdbc,Hdfs等),这些功能让大批量数据导入/导出Kafka很方便。
DSC0000.png

  如图中所示,左侧的Sources负责从其他异构系统中读取数据并导入到Kafka中;右侧的Sinks是把Kafka中的数据写入到其他的系统中。

二. 各种Kafka Connector
  Kafka Connector很多,包括开源和商业版本的。如下列表中是常用的开源Connector

Connectors
References
Jdbc
Source, Sink
Elastic Search
Sink1, Sink2, Sink3
Cassandra
Source1, Source 2, Sink1, Sink2
MongoDB
Source
HBase
Sink
Syslog
Source
MQTT (Source)
Source
Twitter (Source)
Source, Sink
S3
Sink1, Sink2  商业版的可以通过Confluent.io获得

三. 示例

3.1 FileConnector Demo
  本例演示如何使用Kafka Connect把Source(test.txt)转为流数据再写入到Destination(test.sink.txt)中。如下图所示:
DSC0001.png

  本例使用到了两个Connector:


  • FileStreamSource:从test.txt中读取并发布到Broker中
  • FileStreamSink:从Broker中读取数据并写入到test.sink.txt文件中
  其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties
  

name=local-file-source  
connector.class
=FileStreamSource  
tasks.max
=1  
file=test.txt
  
topic=connect-test
  

  其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties
  

name=local-file-sink  
connector.class
=FileStreamSink  
tasks.max
=1  
file=test.sink.txt
  
topics=connect-test
  

  Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties
  

bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConverter  
value.converter
=org.apache.kafka.connect.json.JsonConverter  
key.converter.schemas.enable
=true  
value.converter.schemas.enable
=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter  
internal.value.converter
=org.apache.kafka.connect.json.JsonConverter  
internal.key.converter.schemas.enable
=false  
internal.value.converter.schemas.enable
=false  
offset.storage.
file.filename=/tmp/connect.offsets  
offset.flush.interval.ms
=10000  


3.2 运行Demo
  需要熟悉Kafka的一些命令行,参考本系列之前的文章Apache Kafka系列(二) 命令行工具(CLI)
  3.2.1 启动Kafka Broker
  

[iyunv@localhost bin]# cd /opt/kafka_2.11-0.11.0.0/  
[iyunv@localhost kafka_2.
11-0.11.0.0]# ls  
bin  config  libs  LICENSE  logs  NOTICE  site
-docs  
[iyunv@localhost kafka_2.
11-0.11.0.0]# ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
[iyunv@localhost kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh ./config/server.properties &

  

  3.2.2 启动Source Connector和Sink Connector
  

[iyunv@localhost kafka_2.11-0.11.0.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties   

  3.3.3 打开console-consumer
  

./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test  

  3.3.4 写入到test.txt文件中,并观察3.3.3中的变化
  

[iyunv@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt  
[iyunv@Server4 kafka_2.
12-0.11.0.0]# echo 'second line' >> test.txt  

3.3.3中打开的窗口输出如下  
{
"schema":{"type":"string","optional":false},"payload":"firest line"}  
{
"schema":{"type":"string","optional":false},"payload":"second line"}  

  3.3.5 查看test.sink.txt
  

[iyunv@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt  
firest line
  
second line
  


四. 结论
  本例仅仅演示了Kafka自带的File Connector,后续文章会完成JndiConnector,HdfsConnector,并且会使用CDC(Changed Data Capture)集成Kafka来完成一个ETL的例子
  PS:
  相比编译过Kafka-Manager都知道各种坑,经过了3个小时的努力,我终于把Kafka-Manager编译通过并打包了,并且新增了Kafka0.11.0版本支持。
  附下载地址: 链接: https://pan.baidu.com/s/1miiMsAk 密码: 866q

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-427458-1-1.html 上篇帖子: 【译】调优Apache Kafka集群 下篇帖子: Apache Kafka系列(一) 起步
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表