285572001 发表于 2017-5-23 17:42:54

kafka TopicConfigManager类

  topicconfigManager类
  主要流程为
  1.监控config/change节点,那个topic的config变化了
  2.从zk上的topic的config目录,获取最新config信息
  3.更新logmanager里指定topic的tplog(每个topic每个partition对应一个log)配置

/**
* 注册config change的listener
* Begin watching for config changes
*/
def startup() {
ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.TopicConfigChangesPath)
//监听/config/changes的子节点,ConfigChangeListener
zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener)
//启动服务,检查是否有topic的config需要更新,使用跟ConfigChangeListener相同的方法processConfigChanges
processAllConfigChanges()
}
  主要方法processConfigChanges

/**
* change config topic需要
* 1.设置zk上的topic config;
* 2.在zk上添加一个notification,标志哪个topic的config被改变
* Process the given list of config changes
*/
private def processConfigChanges(notifications: Seq) {
if (notifications.size > 0) {
info("Processing config change notification(s)...")
val now = time.milliseconds
val logs = logManager.logsByTopicPartition.toBuffer
//group by topic,Buffer                         buffer._2 := Log
val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
for (notification <- notifications) {
val changeId = changeNumber(notification)
if (changeId > lastExecutedChange) {//changeid是比现在新的
val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
if(jsonOpt.isDefined) {
val json = jsonOpt.get
val topic = json.substring(1, json.length - 1) // hacky way to dequote
if (logsByTopic.contains(topic)) {
/* combine the default properties with the overrides in zk to create the new LogConfig */
val props = new Properties(logManager.defaultConfig.toProps)
props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))//获得最新topic config和default prop的合并值
val logConfig = LogConfig.fromProps(props)
for (log <- logsByTopic(topic))//获得当前logmanager对象中所有这个topic的log对象
log.config = logConfig
info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
purgeObsoleteNotifications(now, notifications)
}
}
lastExecutedChange = changeId
}
}
}
}
页: [1]
查看完整版本: kafka TopicConfigManager类