设为首页 收藏本站
查看: 993|回复: 0

[经验分享] 【Kafka四】Kakfa伪分布式安装

[复制链接]

尚未签到

发表于 2017-5-23 17:51:01 | 显示全部楼层 |阅读模式
  在http://bit1129.iteye.com/blog/2174791一文中,实现了单Kafka服务器的安装,在Kafka中,每个Kafka服务器称为一个broker。本文简单介绍下,在单机环境下Kafka的伪分布式安装和测试验证

 

1. 安装步骤
  Kafka伪分布式安装的思路跟Zookeeper的伪分布式安装思路完全一样,不过比Zookeeper稍微简单些(不需要创建myid文件),主要是针对每个Kafka服务器配置一个单独的server.properties,三个服务器分别使用server.properties,server.1.properties, server.2.properties

cp server.properties server.1.properties
cp server.properties server.2.properties
  修改server.1.properties和server.2.properties,主要有三个属性需要修改

broker.id=1
port=9093
log.dirs=/tmp/kafka-logs-1
 
  port指的是Kakfa服务器监听的端口
  启动三个Kafka:
bin/kafka-server-start.sh server.properties
  bin/kafka-server-start.sh server.1.properties
  bin/kafka-server-start.sh server.2.properties

2. Kafka脚本常用配置参数

2.1 kafka-console-consumer.sh
  --from-beginning                        If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message. 
  --topic <topic>                           The topic id to consume on
  --zookeeper <urls>                    REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
  --group <gid>                            The group id to consume on. (default: console-consumer-37803)
  在consumer端,不需要指定broke-list,而是通过zookeeper和topic找到所有的持有topic消息的broker

2.2 kafka-console-producer.sh
  --topic <topic>                         REQUIRED: The topic id to produce  messages to.
  --broker-list <broker-list>        REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.

 

2.3 kafka-topic.sh
  --create                                Create a new topic.
  --describe                              List details for the given topics.
  --list                                  List all available topics.
  --partitions <Integer: # of partitions> The number of partitions for the topic being created or altered (WARNING:   If partitions are increased for a  topic that has a key, the partition logic or ordering of the messages will be affected)
  --replication-factor <Integer: replication factor> The replication factor for each partition in the topic being created
  --zookeeper <urls>                    REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
  --topic <topic>                         The topic to be create, alter or describe. Can also accept a regular expression except for --create option

3. 伪机群测试
  测试前,先总结有哪些测试点
  目前想到的是,Partition有个leader的概念,leader partition是什么意思?干什么用的?

3.1 创建Topic

./kafka-topics.sh --create  --topic  topic_p10_r3 --partitions 10 --replication-factor 3  --zookeeper localhost:2181
  创建一个Topic,10个Partition,副本数为3,也就是说,每个broker上的每个分区,在其它节点都有副本,因为每个节点都有10个节点的数据

3.2 每个broker创建的目录
  当创建完Topic后,每个Topic都会在Kakfa的配置目录下(比如/tmp/kafka-logs,建立相应的目录和文件)
  topic_p10_r3-0
  topic_p10_r3-1
  ----
  topic_p10_r3-9
  其中每个目录下面都有两个文件: 00000000000000000000.index  00000000000000000000.log

3.3 Topic的详细信息

./kafka-topics.sh --describe --topic topic_p10_r3  --zookeeper localhost:2181
  得到的结果如下:

Topic:topic_p10_r3PartitionCount:10ReplicationFactor:3Configs:
Topic: topic_p10_r3Partition: 0Leader: 2Replicas: 2,0,1Isr: 2,0,1
Topic: topic_p10_r3Partition: 1Leader: 0Replicas: 0,1,2Isr: 0,1,2
Topic: topic_p10_r3Partition: 2Leader: 1Replicas: 1,2,0Isr: 1,2,0
Topic: topic_p10_r3Partition: 3Leader: 2Replicas: 2,1,0Isr: 2,1,0
Topic: topic_p10_r3Partition: 4Leader: 0Replicas: 0,2,1Isr: 0,2,1
Topic: topic_p10_r3Partition: 5Leader: 1Replicas: 1,0,2Isr: 1,0,2
Topic: topic_p10_r3Partition: 6Leader: 2Replicas: 2,0,1Isr: 2,0,1
Topic: topic_p10_r3Partition: 7Leader: 0Replicas: 0,1,2Isr: 0,1,2
Topic: topic_p10_r3Partition: 8Leader: 1Replicas: 1,2,0Isr: 1,2,0
Topic: topic_p10_r3Partition: 9Leader: 2Replicas: 2,1,0Isr: 2,1,0
  具体的含义是:
  Here is an explanation of output. The first line gives a summary of all the partitions, each additional line gives information about one partition


  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
  3.4 问题: 如果副本数为1,是否表示每个partition在集群中只有1份(也就是说每个partition只会存在于一个broker上),那么leader自然就表示这个partition就在leader所指的broker上了?
  建立包含10个分区,同时只有一个副本的topic

./kafka-topics.sh --create  --topic  topic_p10_r1 --partitions 10 --replication-factor 1  --zookeeper localhost:2181
  详细信息:

[hadoop@hadoop bin]$ ./kafka-topics.sh --describe --topic topic_p10_r1  --zookeeper localhost:2181
Topic:topic_p10_r1PartitionCount:10ReplicationFactor:1Configs:
Topic: topic_p10_r1Partition: 0Leader: 1Replicas: 1Isr: 1
Topic: topic_p10_r1Partition: 1Leader: 2Replicas: 2Isr: 2
Topic: topic_p10_r1Partition: 2Leader: 0Replicas: 0Isr: 0
Topic: topic_p10_r1Partition: 3Leader: 1Replicas: 1Isr: 1
Topic: topic_p10_r1Partition: 4Leader: 2Replicas: 2Isr: 2
Topic: topic_p10_r1Partition: 5Leader: 0Replicas: 0Isr: 0
Topic: topic_p10_r1Partition: 6Leader: 1Replicas: 1Isr: 1
Topic: topic_p10_r1Partition: 7Leader: 2Replicas: 2Isr: 2
Topic: topic_p10_r1Partition: 8Leader: 0Replicas: 0Isr: 0
Topic: topic_p10_r1Partition: 9Leader: 1Replicas: 1Isr: 1
  可见理解不错,每个partition有不同的leader,Leader所在的broker同时也是Replicas所在的broker(ID号一样)
  因此可以理解,
  1. 每个partition副本集都有一个leader
  2. leader指的是partition副本集中的leader,它负责读写,然后负责将数据复制到其它的broker上。
  3.一个Topic的所有partition会比较均匀的分布到多个broker上

3.5 broker挂了,Kafka的容错机制
  在上面已经建立了两个Topic,一个是10个分区3个副本, 一个是10个分区1个副本。此时,假如有一个broker挂了,看看这两个Topic的容错如何?
  通过jps命令可以看到有三个Kafka进程。
  通过ps -ef|grep server.2.properties可以找到brokerId为2的Kakfa进程,使用kill -9将其干掉。干掉的时候,console开始刷屏,异常信息一样,都是:

[2015-02-23 02:14:00,037] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
[2015-02-23 02:14:00,039] ERROR [ReplicaFetcherThread-0-2], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 4325; ClientId: ReplicaFetcherThread-0-2; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [topic_p10_r3,3] -> PartitionFetchInfo(0,1048576),[topic_p10_r3,9] -> PartitionFetchInfo(0,1048576),[topic_p10_r3,6] -> PartitionFetchInfo(0,1048576),[topic_p10_r3,0] -> PartitionFetchInfo(0,1048576) (kafka.server.ReplicaFetcherThread)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2015-02-23 02:14:00,040] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
  3,9,6,0正是topic_p10_r3上broker2作为leader的partition,可见Kafka要做Leader移交,看看此时的topic_p10_r3和topic_p10_r1的情况
  topic_p10_r3(Partition切换到其它Leader上了。。。Rplicas还有3,。。。)

[hadoop@hadoop bin]$ ./kafka-topics.sh --describe --topic topic_p10_r3  --zookeeper localhost:2181
Topic:topic_p10_r3PartitionCount:10ReplicationFactor:3Configs:
Topic: topic_p10_r3Partition: 0Leader: 0Replicas: 2,0,1Isr: 0,1
Topic: topic_p10_r3Partition: 1Leader: 0Replicas: 0,1,2Isr: 0,1
Topic: topic_p10_r3Partition: 2Leader: 1Replicas: 1,2,0Isr: 1,0
Topic: topic_p10_r3Partition: 3Leader: 1Replicas: 2,1,0Isr: 1,0
Topic: topic_p10_r3Partition: 4Leader: 0Replicas: 0,2,1Isr: 0,1
Topic: topic_p10_r3Partition: 5Leader: 1Replicas: 1,0,2Isr: 1,0
Topic: topic_p10_r3Partition: 6Leader: 0Replicas: 2,0,1Isr: 0,1
Topic: topic_p10_r3Partition: 7Leader: 0Replicas: 0,1,2Isr: 0,1
Topic: topic_p10_r3Partition: 8Leader: 1Replicas: 1,2,0Isr: 1,0
Topic: topic_p10_r3Partition: 9Leader: 1Replicas: 2,1,0Isr: 1,0
  topic_p10_r1:没有切换,但是Leader是-1了。。

[hadoop@hadoop bin]$ ./kafka-topics.sh --describe --topic topic_p10_r1  --zookeeper localhost:2181
Topic:topic_p10_r1PartitionCount:10ReplicationFactor:1Configs:
Topic: topic_p10_r1Partition: 0Leader: 1Replicas: 1Isr: 1
Topic: topic_p10_r1Partition: 1Leader: -1Replicas: 2Isr:
Topic: topic_p10_r1Partition: 2Leader: 0Replicas: 0Isr: 0
Topic: topic_p10_r1Partition: 3Leader: 1Replicas: 1Isr: 1
Topic: topic_p10_r1Partition: 4Leader: -1Replicas: 2Isr:
Topic: topic_p10_r1Partition: 5Leader: 0Replicas: 0Isr: 0
Topic: topic_p10_r1Partition: 6Leader: 1Replicas: 1Isr: 1
Topic: topic_p10_r1Partition: 7Leader: -1Replicas: 2Isr:
Topic: topic_p10_r1Partition: 8Leader: 0Replicas: 0Isr: 0
Topic: topic_p10_r1Partition: 9Leader: 1Replicas: 1Isr: 1
  重启broker 2得到结果如下:(对于topic_p10_r3,leader没有变化,即每个Partition都有自己的Leader,新加入的broker只能follower;而topic_p10_r1,则会选出Leader)

[hadoop@hadoop bin]$ ./kafka-topics.sh --describe --topic topic_p10_r3  --zookeeper localhost:2181
Topic:topic_p10_r3PartitionCount:10ReplicationFactor:3Configs:
Topic: topic_p10_r3Partition: 0Leader: 0Replicas: 2,0,1Isr: 0,1,2
Topic: topic_p10_r3Partition: 1Leader: 0Replicas: 0,1,2Isr: 0,1,2
Topic: topic_p10_r3Partition: 2Leader: 1Replicas: 1,2,0Isr: 1,0,2
Topic: topic_p10_r3Partition: 3Leader: 1Replicas: 2,1,0Isr: 1,0,2
Topic: topic_p10_r3Partition: 4Leader: 0Replicas: 0,2,1Isr: 0,1,2
Topic: topic_p10_r3Partition: 5Leader: 1Replicas: 1,0,2Isr: 1,0,2
Topic: topic_p10_r3Partition: 6Leader: 0Replicas: 2,0,1Isr: 0,1,2
Topic: topic_p10_r3Partition: 7Leader: 0Replicas: 0,1,2Isr: 0,1,2
Topic: topic_p10_r3Partition: 8Leader: 1Replicas: 1,2,0Isr: 1,0,2
Topic: topic_p10_r3Partition: 9Leader: 1Replicas: 2,1,0Isr: 1,0,2
[hadoop@hadoop bin]$ ./kafka-topics.sh --describe --topic topic_p10_r1  --zookeeper localhost:2181
Topic:topic_p10_r1PartitionCount:10ReplicationFactor:1Configs:
Topic: topic_p10_r1Partition: 0Leader: 1Replicas: 1Isr: 1
Topic: topic_p10_r1Partition: 1Leader: 2Replicas: 2Isr: 2
Topic: topic_p10_r1Partition: 2Leader: 0Replicas: 0Isr: 0
Topic: topic_p10_r1Partition: 3Leader: 1Replicas: 1Isr: 1
Topic: topic_p10_r1Partition: 4Leader: 2Replicas: 2Isr: 2
Topic: topic_p10_r1Partition: 5Leader: 0Replicas: 0Isr: 0
Topic: topic_p10_r1Partition: 6Leader: 1Replicas: 1Isr: 1
Topic: topic_p10_r1Partition: 7Leader: 2Replicas: 2Isr: 2
Topic: topic_p10_r1Partition: 8Leader: 0Replicas: 0Isr: 0
Topic: topic_p10_r1Partition: 9Leader: 1Replicas: 1Isr: 1

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379937-1-1.html 上篇帖子: Kafka(5)Upgrade the Version with Multiple Machine 下篇帖子: (转)Kafka部署与代码实例
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表