设为首页 收藏本站
查看: 1168|回复: 0

[经验分享] 拨开kafka 的羊毛衫

[复制链接]

尚未签到

发表于 2017-5-23 17:26:59 | 显示全部楼层 |阅读模式
好的, 上篇把 kafka.kafka 干的事情解析了一遍, 什么都看不出来, 是的, 什么都看不出来他干了什么。那么这章来电干货。
 
在kafka中, 主要资源的协调,开始运行时在
class KafkaServer(val config: KafkaConfig) extends Logging
这个类中进行的。
 
在初始化这个类的时候,他做了一件事情。

  val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
咱们来看看  KafkaScheduler的实现
 

  private val executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() {
def newThread(runnable: Runnable): Thread = {
val t = new Thread(runnable, baseThreadName + threadId.getAndIncrement)
t.setDaemon(isDaemon)
t
}
})
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)

 
看到结果了吧, 其实就是  ScheduledThreadPoolExecutor, kafka 初始化了一个单线程的 ScheduledThreadPoolExecutor 而且名字叫做 “kafka-logcleaner-”
 
初始化完成了, 咱们看看  startup 方法里有些什么猫腻。

    isShuttingDown = new AtomicBoolean(false)
shutdownLatch = new CountDownLatch(1)
var needRecovery = true
val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
if (cleanShutDownFile.exists) {
needRecovery = false
cleanShutDownFile.delete
}

 首先是 shutdown 的判断吧之类的操作, 很巧妙,使用了一个文件来表示运行状态。
 

    logManager = new LogManager(config,
scheduler,
SystemTime,
1000L * 60 * 60 * config.logRollHours,
1000L * 60 * config.logCleanupIntervalMinutes,
1000L * 60 * 60 * config.logRetentionHours,
needRecovery)

 
他new 了一个叫做, logManager 的东西,是的,了解kafka 的人都知道,kafka 是全磁盘操作,message全放磁盘上,此类用于磁盘io的操作。相当关键,咱们看一下。
 

    for(dir <- subDirs) {
if(!dir.isDirectory()) {
warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
} else {
info("Loading log '" + dir.getName() + "'")
val topic = Utils.getTopicPartition(dir.getName)._1
val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
val log = new Log(dir, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needRecovery)
val topicPartion = Utils.getTopicPartition(dir.getName)
logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
val parts = logs.get(topicPartion._1)
parts.put(topicPartion._2, log)
}
}
 首先, 他获取了topic们, 获取一些系统属性,把topic 放到 名叫 logs 的一个 pool 中, new Log 的作用是 加载 目录topic 中的log 信息到内存中。
 
在log 对象之中,用LogSegment 抽象了 log 的分段,因为 topic 是有 partition 的。

  /* The actual segments of the log */
private[log] val segments: SegmentList[LogSegment] = loadSegments()
 这个 [size=1em]segments 搜有的加在一起就是一个完整的 topic。
[size=1em]然后是按照 logsegment 的start 排个序,做个验证,完事。
 
  [size=1em]接着, 把 各个topic 信息放到内存中之后,开始用

  if(scheduler != null) {
info("starting log cleaner every " + logCleanupIntervalMs + " ms")   
scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
}
  [size=1em] 来定时 按照 config.logCleanupIntervalMinutes 配置的[size=1em]分钟做一些事情。做什么事情呢, 清空一下旧的log,按照两种标准清空, 一个是超过一定时间的log,还有一个是超过大小的log。
  [size=1em]下面到了跟zk交互的阶段

  if(config.enableZookeeper) {
kafkaZookeeper = new KafkaZooKeeper(config, this)
kafkaZookeeper.startup
zkActor = new Actor {
def act() {
loop {
receive {
case topic: String =>
try {
kafkaZookeeper.registerTopicInZk(topic)
}
catch {
case e => error(e) // log it and let it go
}
case StopActor =>
info("zkActor stopped")
exit
}
}
}
}
zkActor.start
}
  [size=1em] 
  [size=1em]跟zk交互的过程包括创建以下path:
   *   /topics/[topic]/[node_id-partition_num]
   *   /brokers/[0...N] --> host:port
  订阅事件
  总结一下,磁盘部分在 broker 初始化的时候,加载topic 信息到内存, 定期清理以下log, 跟zk做一些注册,订阅事件。
  下回咱们看下,初始化的时候, 是网络连接用的什么神奇的东西。
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379905-1-1.html 上篇帖子: Kafka入门经典教程 下篇帖子: 【Kafka十一】关于Kafka的副本管理
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表