设为首页 收藏本站
查看: 1301|回复: 0

[经验分享] Kafka设计解析(三):Kafka High Availability (下)

[复制链接]

尚未签到

发表于 2019-1-31 09:44:40 | 显示全部楼层 |阅读模式
  Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。InfoQ一直在紧密关注Kafka的应用以及发展,“Kafka剖析”专栏将会从架构设计、实现、应用场景、性能等方面深度解析Kafka。
  本文在上篇文章基础上,更加深入讲解了Kafka的HA机制,主要阐述了HA相关各种场景,如Broker failover、Controller failover、Topic创建/删除、Broker启动、Follower从Leader fetch数据等详细处理过程。同时介绍了Kafka提供的与Replication相关的工具,如重新分配Partition等。
  Broker Failover过程
  Controller对Broker failure的处理过程
  1.Controller在ZooKeeper的/brokers/ids节点上注册Watch。一旦有Broker宕机(本文用宕机代表任何让Kafka认为其Broker die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在ZooKeeper对应的Znode会自动被删除,ZooKeeper会fire Controller注册的Watch,Controller即可获取最新的幸存的Broker列表。
  2.Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition。
  3.对set_p中的每一个Partition:
  3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR。
  3.2 决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。
  3.3 将新的Leader,ISR和新的leader_epoch及controller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有Controller版本在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1。
  4.直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。
  Broker failover顺序图如下所示。
  LeaderAndIsrRequest结构如下
  LeaderAndIsrResponse结构如下
  创建/删除Topic
  1.Controller在ZooKeeper的/brokers/topics节点上注册Watch,一旦某个Topic被创建或删除,则Controller会通过Watch得到新创建/删除的Topic的Partition/Replica分配。
  2.对于删除Topic操作,Topic工具会将该Topic名字存于/admin/delete_topics。若delete.topic.enable为true,则Controller注册在/admin/delete_topics上的Watch被fire,Controller通过回调向对应的Broker发送StopReplicaRequest,若为false则Controller不会在/admin/delete_topics上注册Watch,也就不会对该事件作出反应。
  3.对于创建Topic操作,Controller从/brokers/ids读取当前所有可用的Broker列表,对于set_p中的每一个Partition:
  3.1 从分配给该Partition的所有Replica(称为AR)中任选一个可用的Broker作为新的Leader,并将AR设置为新的ISR(因为该Topic是新创建的,所以AR中所有的Replica都没有数据,可认为它们都是同步的,也即都在ISR中,任意一个Replica都可作为Leader)
  3.2 将新的Leader和ISR写入/brokers/topics/[topic]/partitions/[partition]
  4.直接通过RPC向相关的Broker发送LeaderAndISRRequest。
  创建Topic顺序图如下所示。
  Broker响应请求流程
  Broker通过kafka.network.SocketServer及相关模块接受各种请求并作出响应。整个网络通信模块基于Java NIO开发,并采用Reactor模式,其中包含1个Acceptor负责接受客户请求,N个Processor负责读写数据,M个Handler处理业务逻辑。
  Acceptor的主要职责是监听并接受客户端(请求发起方,包括但不限于Producer,Consumer,Controller,Admin Tool)的连接请求,并建立和客户端的数据传输通道,然后为该客户端指定一个Processor,至此它对该客户端该次请求的任务就结束了,它可以去响应下一个客户端的连接请求了。其核心代码如下。
  Processor主要负责从客户端读取数据并将响应返回给客户端,它本身并不处理具体的业务逻辑,并且其内部维护了一个队列来保存分配给它的所有SocketChannel。Processor的run方法会循环从队列中取出新的SocketChannel并将其SelectionKey.OP_READ注册到selector上,然后循环处理已就绪的读(请求)和写(响应)。Processor读取完数据后,将其封装成Request对象并将其交给RequestChannel。
  RequestChannel是Processor和KafkaRequestHandler交换数据的地方,它包含一个队列requestQueue用来存放Processor加入的Request,KafkaRequestHandler会从里面取出Request来处理;同时它还包含一个respondQueue,用来存放KafkaRequestHandler处理完Request后返还给客户端的Response。
  Processor会通过processNewResponses方法依次将requestChannel中responseQueue保存的Response取出,并将对应的SelectionKey.OP_WRITE事件注册到selector上。当selector的select方法返回时,对检测到的可写通道,调用write方法将Response返回给客户端。
  KafkaRequestHandler循环从RequestChannel中取Request并交给kafka.server.KafkaApis处理具体的业务逻辑。
  LeaderAndIsrRequest响应过程
  对于收到的LeaderAndIsrRequest,Broker主要通过ReplicaManager的becomeLeaderOrFollower处理,流程如下:
  1.若请求中controllerEpoch小于当前最新的controllerEpoch,则直接返回ErrorMapping.StaleControllerEpochCode。
  2.对于请求中partitionStateInfos中的每一个元素,即((topic, partitionId), partitionStateInfo):
  2.1 若partitionStateInfo中的leader epoch大于当前ReplicManager中存储的(topic, partitionId)对应的partition的leader epoch,则:

  2.1.1 若当前brokerid(或者说replica>  2.1.2 否则说明该Broker不在该Partition分配的Replica list中,将该信息记录于log中
  2.2 否则将相应的Error code(ErrorMapping.StaleLeaderEpochCode)存入Response中

  3.筛选出partitionState中Leader与当前Broker>  4.若partitionsTobeLeader不为空,则对其执行makeLeaders方。
  5.若partitionsToBeFollower不为空,则对其执行makeFollowers方法。
  6.若highwatermak线程还未启动,则将其启动,并将hwThreadInitialized设为true。
  7.关闭所有Idle状态的Fetcher。
  LeaderAndIsrRequest处理过程如下图所示
  Broker启动过程
  Broker启动后首先根据其ID在ZooKeeper的/brokers/idszonde下创建临时子节点(Ephemeral node),创建成功后Controller的ReplicaStateMachine注册其上的Broker Change Watch会被fire,从而通过回调KafkaController.onBrokerStartup方法完成以下步骤:
  1.向所有新启动的Broker发送UpdateMetadataRequest,其定义如下。
  2.将新启动的Broker上的所有Replica设置为OnlineReplica状态,同时这些Broker会为这些Partition启动high watermark线程。
  3.通过partitionStateMachine触发OnlinePartitionStateChange。
  Controller Failover
  Controller也需要Failover。每个Broker都会在Controller Path (/controller)上注册一个Watch。当前Controller失败时,对应的Controller Path会自动消失(因为它是Ephemeral Node),此时该Watch被fire,所有“活”着的Broker都会去竞选成为新的Controller(创建新的Controller Path),但是只会有一个竞选成功(这点由ZooKeeper保证)。竞选成功者即为新的Leader,竞选失败者则重新在新的Controller Path上注册Watch。因为ZooKeeper的Watch是一次性的,被fire一次之后即失效,所以需要重新注册。
  Broker成功竞选为新Controller后会触发KafkaController.onControllerFailover方法,并在该方法中完成如下操作:
  1.读取并增加Controller Epoch。
  2.在ReassignedPartitions Patch(/admin/reassign_partitions)上注册Watch。
  3.在PreferredReplicaElection Path(/admin/preferred_replica_election)上注册Watch。
  4.通过partitionStateMachine在Broker Topics Patch(/brokers/topics)上注册Watch。
  5.若delete.topic.enable设置为true(默认值是false),则partitionStateMachine在Delete Topic Patch(/admin/delete_topics)上注册Watch。

  6.通过replicaStateMachine在Broker>  7.初始化ControllerContext对象,设置当前所有Topic,“活”着的Broker列表,所有Partition的Leader及ISR等。
  8.启动replicaStateMachine和partitionStateMachine。
  9.将brokerState状态设置为RunningAsController。
  10.将每个Partition的Leadership信息发送给所有“活”着的Broker。
  11.若auto.leader.rebalance.enable配置为true(默认值是true),则启动partition-rebalance线程。
  12.若delete.topic.enable设置为true且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。
  Partition重新分配
  管理工具发出重新分配Partition请求后,会将相应信息写到/admin/reassign_partitions上,而该操作会触发ReassignedPartitionsIsrChangeListener,从而通过执行回调函数KafkaController.onPartitionReassignment来完成以下操作:
  1.将ZooKeeper中的AR(Current Assigned Replicas)更新为OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。
  2.强制更新ZooKeeper中的leader epoch,向AR中的每个Replica发送LeaderAndIsrRequest。
  3.将RAR - OAR中的Replica设置为NewReplica状态。
  4.等待直到RAR中所有的Replica都与其Leader同步。
  5.将RAR中所有的Replica都设置为OnlineReplica状态。
  6.将Cache中的AR设置为RAR。
  7.若Leader不在RAR中,则从RAR中重新选举出一个新的Leader并发送LeaderAndIsrRequest。若新的Leader不是从RAR中选举而出,则还要增加ZooKeeper中的leader epoch。
  8.将OAR - RAR中的所有Replica设置为OfflineReplica状态,该过程包含两部分。第一,将ZooKeeper上ISR中的OAR - RAR移除并向Leader发送LeaderAndIsrRequest从而通知这些Replica已经从ISR中移除;第二,向OAR - RAR中的Replica发送StopReplicaRequest从而停止不再分配给该Partition的Replica。
  9.将OAR - RAR中的所有Replica设置为NonExistentReplica状态从而将其从磁盘上删除。
  10.将ZooKeeper中的AR设置为RAR。
  11.删除/admin/reassign_partition。
  注意:最后一步才将ZooKeeper中的AR更新,因为这是唯一一个持久存储AR的地方,如果Controller在这一步之前crash,新的Controller仍然能够继续完成该过程。
  以下是Partition重新分配的案例,OAR = {1,2,3},RAR = {4,5,6},Partition重新分配过程中ZooKeeper中的AR和Leader/ISR路径如下
  ARleader/isrSttep
  {1,2,3}1/{1,2,3}(initial state)
  {1,2,3,4,5,6}1/{1,2,3}(step 2)
  {1,2,3,4,5,6}1/{1,2,3,4,5,6}(step 4)
  {1,2,3,4,5,6}4/{1,2,3,4,5,6}(step 7)
  {1,2,3,4,5,6}4/{4,5,6}(step 8)
  {4,5,6}4/{4,5,6}(step 10)
  Follower从Leader Fetch数据
  Follower通过向Leader发送FetchRequest获取消息,FetchRequest结构如下
  从FetchRequest的结构可以看出,每个Fetch请求都要指定最大等待时间和最小获取字节数,以及由TopicAndPartition和PartitionFetchInfo构成的Map。实际上,Follower从Leader数据和Consumer从Broker Fetch数据,都是通过FetchRequest请求完成,所以在FetchRequest结构中,其中一个字段是clientID,并且其默认值是ConsumerConfig.DefaultClientId。
  Leader收到Fetch请求后,Kafka通过KafkaApis.handleFetchRequest响应该请求,响应过程如下:
  1.replicaManager根据请求读出数据存入dataRead中。
  2.如果该请求来自Follower则更新其相应的LEO(log end offset)以及相应Partition的High Watermark
  3.根据dataRead算出可读消息长度(单位为字节)并存入bytesReadable中。
  4.满足下面4个条件中的1个,则立即将相应的数据返回
  Fetch请求不希望等待,即fetchRequest.macWait

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669902-1-1.html 上篇帖子: 关于CDH5.11.0自带kafka 0.10 bootstrap 下篇帖子: Kafka(八)Python生产者和消费者API使用
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表