设为首页 收藏本站
查看: 1010|回复: 0

[经验分享] Kafka 快速搭建指南

[复制链接]

尚未签到

发表于 2019-1-31 09:13:53 | 显示全部楼层 |阅读模式
  1.下载Kafka
wget tar zxvf kafka_2.11-0.10.0.0.tgz
cd kafka_2.11-0.10.0.0  2.启动服务端

  Kafka需要用到ZooKeepr,所以需要先启动一个ZooKeepr服务端,如果没有单独的ZooKeeper服务端,可以使用Kafka自带的脚本快速启动一个单节点ZooKeepr实例

  

# bin/zookeeper-server-start.sh config/zookeeper.properties  

  启动Kafka服务端实例
# bin/kafka-server-start.sh config/server.properties  

  

  3.创建一个Kafka Topic

  创建一个名为test的topic,这个topic只有一个分区和一个副本
# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test  

  创建以后就可以查看了
# bin/kafka-topics.sh --list  --zookeeper localhost:2181  

  另外,除了手动创建topics,可以配置Brokers自动创建topics
  

  4.发送一些消息
  Kafka带有一个命令行工具kafka-console-producer.sh,可以从一个文件或者标准输入读入数据然后以消息的方式发送到Kafka集群。默认每行将被单独作为消息发送。
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test2
This is a message
This is another message  

  5.启动一个消费者
  Kafka也自带一个命令行工具用以消费Kafka集群的消息
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test2 --from-beginning  

  producer和consumer分别开两个Linu终端,producer端输入一些内容,就可以看到consumer端会实时显示producer输入的内容
  

  6.建立一个multi-broker集群
  以上我们建立运行了单个Kafka broker,但是对于Kafka,单个broker只是Kafka集群的的一个成员,下面我们将扩展Kafka集群到三个broker实例
#cp config/server.properties config/server1.properties
#cp config/server.properties config/server2.properties  

  主要修改几个参数
  broker.id
  listeners
  log.dirs
  

  broker.id是一个Kafka broker实例在集群中的唯一属于,每个实例需要不同。

bin/kafka-server-start.sh config/server1.properties &
bin/kafka-server-start.sh config/server2.properties &  

  现在创建一个具有三个复制成员的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic  

  查看集群中每个broker的状态

# bin/kafka-topics.sh --describe --zookeeper localhost:2181  --topic my-replicated-topic
Topic:my-replicated-topicPartitionCount:1ReplicationFactor:3Configs:
Topic: my-replicated-topicPartition: 0Leader: 0Replicas: 0,2,1Isr: 0,2,1  

  

  先列出所有分区的概要信息,一个分区显示一行
  Leader  负责给定分区的所有读写操作,每个集群节点会是所有分区中随机选取的分区的leader
  Replicas 列出当前分区的复制节点,不管这些节点是否是Leader甚至是否当前存活
  Isr    in-sync复制集的子集,列出当前存活并且赶上leader的集群节点
  

  向新的topic发送一些消息
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic  输入一些消息
  再开一个终端消费这些消息
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic  

  可以看到输入的消息很快就别消费掉了
  

  现在测试搭建的Kafka Cluster的容错能力
# bin/kafka-topics.sh --describe --zookeeper localhost:2181  --topic my-replicated-topic
Topic:my-replicated-topicPartitionCount:1ReplicationFactor:3Configs:
Topic: my-replicated-topicPartition: 0Leader: 0Replicas: 0,2,1Isr: 0,1,2  

  Leader:0 表示Broker 0 是leader,Broker 1和Broker 2是replica
  

  现在我们把Broker 0 给kill 掉
# kill -9 $(ps -ef|grep server.properties|grep -v grep|awk '{print $2}')  

  再查看集群节点状态
# bin/kafka-topics.sh --describe --zookeeper localhost:2181  --topic my-replicated-topic
Topic:my-replicated-topicPartitionCount:1ReplicationFactor:3Configs:
Topic: my-replicated-topicPartition: 0Leader: 2Replicas: 0,2,1Isr: 1,2  

  可以看到Leader已经切换到Broker 2了,Broker 0已经不在Isr这个子集中了,Replicas还是三个成员
  

  再次执行消费者
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic  

  可以看到之前生产者发送的消息依然可以被消费,虽然负责消息写入的leader已经挂掉了

  

  

  

  7.使用Kfaka Connect导入导出数据
  从Linux终端读入数据然后将数据输出到终端方便了解Kafka,接下来使用Kafka从其他数据来源导入数据,然后导出数据到其他系统中去。对于很多系统来说,可以使用Kafka Connect来导入导出数据来取代重新编写一些自定义代码。Kafka Connect是一个Kafka自带的工具用于导入导出数据。
  

  先向一个文件中写入一些数据
echo -e "foo\nbar" > test.txt  

  然后,启动两个Kafka Connector以standalone模式运行

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties  

  connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000  

  connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test  

  

  connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test  

  一旦启动Kafka Connect进程,source connector会从test.txt中读入数据,然后将这些数据推送给connect-test这个topic,sink connector会从这个topic读出数据并写入数据到test.sink.txt文件

# cat test.sink.txt
foo
bar  

  消息是存储在connect-test这个topic中的
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}  

  connector的终端先不要断开,继续向test.txt文件中追加几行,看看console-consumer是否会有刷新

# echo "Another line" >> test.txt# cat test.sink.txt
foo
bar
Another line  

  

  可以看到test.sink.txt这个文件也多了一行
  

  

  8.Use Kafka Streams to process data
  Kafka Streams是Kafka用于实时流数据处理和分析Kafka brokers中存储的数据的客户端库。

#echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt# cat file-input.txt | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input# ./bin/kafka-run-class.sh  org.apache.kafka.streams.examples.wordcount.WordCountDemo  

# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
> --topic streams-wordcount-output \
> --from-beginning \
> --formatter kafka.tools.DefaultMessageFormatter \
> --property print.key=true \
> --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
> --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer  

  输出

all1
streams1
lead1
to1
kafka1
hello1
kafka2
streams2
join1
kafka3
summit1  

  

  

  

  

  

  

  参考文档:
  http://kafka.apache.org/documentation.html#quickstart
  





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669873-1-1.html 上篇帖子: bireme数据源同步工具 下篇帖子: kafka参数配置详解
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表