在kafka中, 主要资源的协调,开始运行时在
class KafkaServer(val config: KafkaConfig) extends Logging
这个类中进行的。
在初始化这个类的时候,他做了一件事情。
val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
咱们来看看 KafkaScheduler的实现
private val executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() {
def newThread(runnable: Runnable): Thread = {
val t = new Thread(runnable, baseThreadName + threadId.getAndIncrement)
t.setDaemon(isDaemon)
t
}
})
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
isShuttingDown = new AtomicBoolean(false)
shutdownLatch = new CountDownLatch(1)
var needRecovery = true
val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
if (cleanShutDownFile.exists) {
needRecovery = false
cleanShutDownFile.delete
}
/* The actual segments of the log */
private[log] val segments: SegmentList[LogSegment] = loadSegments()
这个 [size=1em]segments 搜有的加在一起就是一个完整的 topic。
[size=1em]然后是按照 logsegment 的start 排个序,做个验证,完事。