设为首页 收藏本站
查看: 1332|回复: 0

[经验分享] Kafka 转移分区分析

[复制链接]

尚未签到

发表于 2017-5-23 17:59:25 | 显示全部楼层 |阅读模式
  1, 关于如何转移分区: 以及如何新增节点的问题, 我们在 Kafka中文文档 中已经有过叙述。详细参考
  2, 分析命令的执行过程 : 分区调用的脚本是 kafka-reassign-partitions.sh, 具体内容是:
  exec $(dirname $0)/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand $@
  3, 分析一下 ReassignPartitionsCommand 的代码, 主要有三个方法,分别对应生成分区
  分配的方案, 执行分配方案, 以及验证分配方案执行情况
  4,   生成分区分配方案代码如下:

  def generateAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt)))
CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options")
val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt)
val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt)
val duplicateReassignments = CoreUtils.duplicates(brokerListToReassign)
if (duplicateReassignments.nonEmpty)
throw new AdminCommandFailedException("Broker list contains duplicate entries: %s".format(duplicateReassignments.mkString(",")))
val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
if (duplicateTopicsToReassign.nonEmpty)
throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
groupedByTopic.foreach { topicInfo =>
val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size,
topicInfo._2.head._2.size)
partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
}
val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq)
println("Current partition replica assignment\n\n%s"
.format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
println("Proposed partition reassignment configuration\n\n%s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)))
}
   实际最终调用那个的是AdminUtils.assignReplicasToBrokers方法, 这个方法的注释如下:
  /**
   * There are 2 goals of replica assignment:
   * 1. Spread the replicas evenly among brokers.
   * 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
   *
   * To achieve this goal, we:
   * 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
   * 2. Assign the remaining replicas of each partition with an increasing shift.
   *
   * Here is an example of assigning
   * broker-0  broker-1  broker-2  broker-3  broker-4
   * p0        p1        p2        p3        p4       (1st replica)
   * p5        p6        p7        p8        p9       (1st replica)
   * p4        p0        p1        p2        p3       (2nd replica)
   * p8        p9        p5        p6        p7       (2nd replica)
   * p3        p4        p0        p1        p2       (3nd replica)
   * p7        p8        p9        p5        p6       (3nd replica)
   */
  可以看到, 具体的算法是根据broker以及副本数来循环分区数,这样就能得到每个broker
  上面关于这个的分区结果。generateAssignment方法会循环调用这个方法来处理不同的topic。
  具体的返回结果应该是一个map, map的主键是 TopicAndPartition, 键值是关于这个topic的
  partition的broker的id的List。然后会把这个分配的结果给打印出来。
  5, 执行分配代码分析

def executeAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
if(!opts.options.has(opts.reassignmentJsonFileOpt))
CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option")
val reassignmentJsonFile =  opts.options.valueOf(opts.reassignmentJsonFileOpt)
val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
if (partitionsToBeReassigned.isEmpty)
throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile))
val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map{ case(tp,replicas) => tp})
if (duplicateReassignedPartitions.nonEmpty)
throw new AdminCommandFailedException("Partition reassignment contains duplicate topic partitions: %s".format(duplicateReassignedPartitions.mkString(",")))
val duplicateEntries= partitionsToBeReassigned
.map{ case(tp,replicas) => (tp, CoreUtils.duplicates(replicas))}
.filter{ case (tp,duplicatedReplicas) => duplicatedReplicas.nonEmpty }
if (duplicateEntries.nonEmpty) {
val duplicatesMsg = duplicateEntries
.map{ case (tp,duplicateReplicas) => "%s contains multiple entries for %s".format(tp, duplicateReplicas.mkString(",")) }
.mkString(". ")
throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicatesMsg))
}
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned.toMap)
// before starting assignment, output the current replica assignment to facilitate rollback
val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq)
println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
.format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
// start the reassignment
if(reassignPartitionsCommand.reassignPartitions())
println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned.toMap)))
else
println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
}

   这个方法前面是一些常规的参数检查, 后面世构造了一个 ReassignPartitionsCommand , 然后执行这个
  command的reassignPartitionsCommand.reassignPartitions() 的方法。 主要看下 reassignPartitions方法
  是如何执行的。
  这个方法先是验证参数的有效性, 然后将新的分配方案写入到zookeeper的/admin/reassign_partitions路径
  中, 其他的事情也没有做。
  6, 如何验证分配是否执行完成

  def verifyAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
if(!opts.options.has(opts.reassignmentJsonFileOpt))
CommandLineUtils.printUsageAndDie(opts.parser, "If --verify option is used, command must include --reassignment-json-file that was used during the --execute option")
val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
val jsonString = Utils.readFileAsString(jsonFile)
val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
println("Status of partition reassignment:")
val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
reassignedPartitionsStatus.foreach { partition =>
partition._2 match {
case ReassignmentCompleted =>
println("Reassignment of partition %s completed successfully".format(partition._1))
case ReassignmentFailed =>
println("Reassignment of partition %s failed".format(partition._1))
case ReassignmentInProgress =>
println("Reassignment of partition %s is still in progress".format(partition._1))
}
}
}
 
具体的逻辑也就是查看根据新的分配方案, 查看zookeeper对应的topic中是否有符合新的分配方案的broker的id,如果有的话,则是成功,如果没有则是失败。 默认的状态是进行中。  7,  分配如何进行的
  目前看到的结果只是将新的分配方案写入到zookeeper对应的路径上面, 那到底是那个进程来处理这些事情呢?
  在 Kafka 启动过程 中我们看到了有调用 kafkaController.startup, 这个过程会去正当controller, 并不是所
  有的节点都会成为controller, 只有成为主的controller才会回调 onControllerFailover 这个方法, 这个方法
  注册了监听 "/admin/reassign_partitions" 目录的事件, 处理 事件是通过  PartitionsReassignedListener  的
  handleDataChange来处理的。 实际上最终处理的是通过 onPartitionReassignment的方法

/**
* This callback is invoked by the reassigned partitions listener. When an admin command initiates a partition
* reassignment, it creates the /admin/reassign_partitions path that triggers the zookeeper listener.
* Reassigning replicas for a partition goes through a few steps listed in the code.
* RAR = Reassigned replicas
* OAR = Original list of replicas for partition
* AR = current assigned replicas
*
* 1. Update AR in ZK with OAR + RAR.
* 2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). We do this by forcing an update
*    of the leader epoch in zookeeper.
* 3. Start new replicas RAR - OAR by moving replicas in RAR - OAR to NewReplica state.
* 4. Wait until all replicas in RAR are in sync with the leader.
* 5  Move all replicas in RAR to OnlineReplica state.
* 6. Set AR to RAR in memory.
* 7. If the leader is not in RAR, elect a new leader from RAR. If new leader needs to be elected from RAR, a LeaderAndIsr
*    will be sent. If not, then leader epoch will be incremented in zookeeper and a LeaderAndIsr request will be sent.
*    In any case, the LeaderAndIsr request will have AR = RAR. This will prevent the leader from adding any replica in
*    RAR - OAR back in the isr.
* 8. Move all replicas in OAR - RAR to OfflineReplica state. As part of OfflineReplica state change, we shrink the
*    isr to remove OAR - RAR in zookeeper and sent a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr.
*    After that, we send a StopReplica (delete = false) to the replicas in OAR - RAR.
* 9. Move all replicas in OAR - RAR to NonExistentReplica state. This will send a StopReplica (delete = false) to
*    the replicas in OAR - RAR to physically delete the replicas on disk.
* 10. Update AR in ZK with RAR.
* 11. Update the /admin/reassign_partitions path in ZK to remove this partition.
* 12. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker.
*
* For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK
* may go through the following transition.
* AR                 leader/isr
* {1,2,3}            1/{1,2,3}           (initial state)
* {1,2,3,4,5,6}      1/{1,2,3}           (step 2)
* {1,2,3,4,5,6}      1/{1,2,3,4,5,6}     (step 4)
* {1,2,3,4,5,6}      4/{1,2,3,4,5,6}     (step 7)
* {1,2,3,4,5,6}      4/{4,5,6}           (step 8)
* {4,5,6}            4/{4,5,6}           (step 10)
*
* Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently.
* This way, if the controller crashes before that step, we can still recover.
*/
def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match {
case false =>
info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
"reassigned not yet caught up with the leader")
val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
//1. Update AR in ZK with OAR + RAR.
updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
//2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
newAndOldReplicas.toSeq)
//3. replicas in RAR - OAR -> NewReplica
startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
"reassigned to catch up with the leader")
case true =>
//4. Wait until all replicas in RAR are in sync with the leader.
val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
//5. replicas in RAR -> OnlineReplica
reassignedReplicas.foreach { replica =>
replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
replica)), OnlineReplica)
}
//6. Set AR to RAR in memory.
//7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and
//   a new AR (using RAR) and same isr to every broker in RAR
moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
//8. replicas in OAR - RAR -> Offline (force those replicas out of isr)
//9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)
stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
//10. Update AR in ZK with RAR.
updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
//11. Update the /admin/reassign_partitions path in ZK to remove this partition.
removePartitionFromReassignedPartitions(topicAndPartition)
info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
//12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
// signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
}
}
  参考例子可以得到,先是新增节点,然后通过replicas机制将新增的节点变为ISR,然后再把需要替换掉的ISR
  去除。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379948-1-1.html 上篇帖子: Kafka Consumer端的一些解惑 下篇帖子: Kafka: High Qulity Posts
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表