设为首页 收藏本站
查看: 1446|回复: 0

[经验分享] Kafka connect快速构建数据ETL通道

[复制链接]

尚未签到

发表于 2017-6-7 12:04:19 | 显示全部楼层 |阅读模式
  摘要: 作者:Syn良子 出处:http://www.cnblogs.com/cssdongl 转载请注明出处
  业余时间调研了一下Kafka connect的配置和使用,记录一些自己的理解和心得,欢迎指正.
  一.背景介绍
  Kafka connect是Confluent公司(当时开发出Apache Kafka的核心团队成员出来创立的新公司)开发的confluent platform的核心功能.
  大家都知道现在数据的ETL过程经常会选择kafka作为消息中间件应用在离线和实时的使用场景中,而kafka的数据上游和下游一直没有一个
  无缝衔接的pipeline来实现统一,比如会选择flume或者logstash采集数据到kafka,然后kafka又通过其他方式pull或者push数据到目标存储.
  而kafka connect旨在围绕kafka构建一个可伸缩的,可靠的数据流通道,通过kafka connect可以快速实现大量数据进出kafka从而和其
  他源数据源或者目标数据源进行交互构造一个低延迟的数据pipeline.给个图更直观点,大家感受下.
DSC0000.jpg

  二.Kafka-connect快速配置
  这里Confluent官方很贴心的提供了一个集成的镜像以便quickstart,如下链接
  https://s3-us-west-2.amazonaws.com/confluent-files/kafka_connect_blog.ova
  这是存储在Amazon S3上的,直接点击即可下载.这里我使用VMWare直接打开,刚开始会提示一个错误,不用管它直接点击重试即可
  系统加载的过程中会默认初始化虚拟机的网络配置,这里我建议提前设置好桥接网络,让该虚拟机使用桥接网络初始化.
  加载成功后,登录进入该Ubuntu系统,默认的用户名和密码都是:vagrant.
  然后ls查看vagrant用户目录,查看几个关键的脚本内容后,我分别介绍它们的功能
  1>setup.sh:自动下载mysql,mysql jdbc driver,配置好mysql以及做为hive的metastore
  2>start.sh:启动confluent platform,kafka,hadoop,hive相关服务
  3>clean_up.sh:和start.sh相反的,会关闭掉所有的服务,而且还会删除掉所有的数据(例如hdfs namenode和 datanode的数据,其实相当于fs format了)
  那么很明显,第一步肯定是执行setup.sh,这里执行后会报错如下
DSC0001.png

  这里无法下载相关的软件包,好吧,那么我们需要更新一下下载源的索引,执行如下命令
  sudo apt-get update
  更新完毕后再次执行setup.sh安装好mysql,hive等服务
  紧接着执行start.sh来启动上述服务,启动后应该有如下进程,这是一个伪分布式节点
DSC0002.png

  对了,虚拟机各个服务(例如hive,zookeeper等),配置文件和日志文件在路径/mnt/下,组件的安装位置位于/opt下
  三.Kafka connect快速使用
  配置完以后就可以准备使用kafka-connect来快速构建一个数据pipeline了,如下图所示
DSC0003.png

  整个过程是将数据以mysql作为数据源,将数据通过kafka connect快速ETL到hive中去.注意这里图中没画kafka
  但是实际上是包含在kafka connect里面的,话不多说,开始使用
  1>Mysql数据准备
  执行如下命令

$ mysql -u root
mysql> CREATE DATABASE demo;
mysql> USE demo;
mysql> CREATE TABLE users (
->   id serial NOT NULL PRIMARY KEY,
->   name varchar(100),
->   email varchar(200),
->   department varchar(200),
->   modified timestamp default CURRENT_TIMESTAMP NOT NULL,
->   INDEX `modified_index` (`modified`)
-> );
mysql> INSERT INTO users (name, email, department) VALUES ('alice', 'alice@abc.com', 'engineering');
mysql> INSERT INTO users (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
mysql> exit;  注意上面第一行, ,对,你没看错,这里虚拟机mysql的root默认密码就是mypassword,
  强迫症患者请自行更改.随后建库,建表,插入数据.
  2>关键概念准备
  这里我快速普及一下参考官方文档理解的一些关键概念.
  kafka connector:kafka connector是kafka connect的关键组成部分,它是一个逻辑上的job,用于在kafka和其他系统之间拷贝数据,比如
  从上游系统拷贝数据到kafka,或者从kafka拷贝数据到下游系统
  Tasks:每个kafka connector可以初始化一组task进行数据的拷贝
  Workers:逻辑上包含kafka connector和tasks用来调度执行具体任务的进程,具体执行时分为standalone模式和distributed模式
  见下图,这个是kafka上游的数据stream过来后,定义好对应的kafka connector后,分解为一组tasks然后push数据到kafka的不同topic
DSC0004.png

  3>利用Kafka-connect摄取数据
  主要是通过配置来实现从mysql摄取数据到kafka,然后按照topic来获取数据写入hdfs,命令如下


connect-standalone /mnt/etc/connect-avro-standalone.properties \
/mnt/etc/mysql.properties /mnt/etc/hdfs.properties &  注意上面这些properties文件是虚拟机已经事先配置好的,可以直接执行实现数据的摄取
  当前使用的kafka connect的standalone模式,当然还有distributed模式后续可以尝试
  上面的那条命令的格式是这样:
connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...]
  主要解释一下connect-standalone后面的参数
  worker.properties:就是上面提到过的worker进程的配置文件,可以定义kafka cluster的相关信息以及数据序列化的格式.
  随后的一些参数就是kafka connector的配置参数了,比如上面的mysql.properties定义了一个kafka jdbc connector,用来同步mysql数据到kafka
  最后一个hdfs.properties是kafka hdfs connector的配置文件,用来消费kafka topic数据push到hdfs.
  那么执行这条命令后就可以将mysql的数据通过kafka connect快速ETL到hdfs了.
  最后可以通过hive创建外表映射hdfs上的数据文件,然后在hive中查看对应数据,如下


$ hive
hive> SHOW TABLES;
OK
test_jdbc_users
hive> SELECT * FROM test_jdbc_users;
OK
1 alice alice@abc.com engineering 1450305345000
2 bob   bob@abc.com   sales       1450305346000  四.Kafka connect使用总结
  1>Kafka connect的使用其实就是配置不同的kafka connectors,这里大家可以把kafka作为中间组件,然后可以类比flume理解,kafka上游的
  connector其实就是fllume的source从上游数据源sink到kafka,kafka的下游connector其实就是flume的source是kafka,sink到下游系统.
  2>Kafka connect的数据pipeline要打通,它要求数据遵守confluent自己的一套通用的schema机制,细心的同学会发现上面jps后会有个进程名
  SchemaRegistryMain,这里官方默认使用Avro格式进出Kafka,所以要留意worker.properties文件的配置信息.
  3>我在使用中没有发现Flume 相关的connector,因此很好奇它应该是没有实现上游flume conector的属性配置。问题应该出在Flume的数据是基
  于event的,而和上面2中所说的schema定义格式没有很好的兼容.
  4>kafka connect的distributed模式应该更实用,随后会尝试,以及confluent所支持的实时处理流kafka streams.
  参考资料:http://docs.confluent.io/2.0.0/platform.html

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-384801-1-1.html 上篇帖子: 玩玩kafka1 单机安装 下篇帖子: .net windows Kafka 安装与使用入门(入门笔记)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表