Apache Kafka源码分析 – Broker Server
1. Kafka.scala
在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装
1: val kafkaServerStartble = new KafkaServerStartable(serverConfig) 2: kafkaServerStartble.startup
1: package kafka.server 2: class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { 3: private var server : KafkaServer = null 4: 5: private def init() { 6: server = new KafkaServer(serverConfig) 7: } 8: 9: def startup() {10: try {11: server.startup()12: }13: catch {...}14: }15: }
2. KafkaServer
KafkaServer代表一个kafka broker, 这是kafka的核心.
只需要看看里面startup了哪些modules, 就知道broker做了哪些工作, 后面一个个具体分析吧
1: package kafka.server 2: /** 3:* Represents the lifecycle of a single Kafka broker. Handles all functionality required 4:* to start up and shutdown a single Kafka node. 5:*/ 6: class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging { 7: var socketServer: SocketServer = null 8: var requestHandlerPool: KafkaRequestHandlerPool = null 9: var logManager: LogManager = null10: var kafkaHealthcheck: KafkaHealthcheck = null11: var topicConfigManager: TopicConfigManager = null12: var replicaManager: ReplicaManager = null13: var apis: KafkaApis = null14: var kafkaController: KafkaController = null15: val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)16: var zkClient: ZkClient = null17: 18: /**19: * Start up API for bringing up a single instance of the Kafka server.20: * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers21: */22: def startup() {23: /* start scheduler */24: kafkaScheduler.startup()25: 26: /* setup zookeeper */27: zkClient = initZk()28: 29: /* start log manager */30: logManager = createLogManager(zkClient)31: logManager.startup()32: 33: socketServer = new SocketServer(config.brokerId,34: config.hostName,35: config.port,36: config.numNetworkThreads,37: config.queuedMaxRequests,38: config.socketSendBufferBytes,39: config.socketReceiveBufferBytes,40: config.socketRequestMaxBytes)41: socketServer.startup()42: 43: replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)44: kafkaController = new KafkaController(config, zkClient)45: 46: /* start processing requests */47: apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)48: requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)49: 50: replicaManager.startup()51: 52: kafkaController.startup()53: 54: topicConfigManager = new TopicConfigManager(zkClient, logManager)55: topicConfigManager.startup()56: 57: /* tell everyone we are alive */58: kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)59: kafkaHealthcheck.startup()60: } 2.1 KafkaScheduler
KafkaSchduler用于在后台执行一些任务,用ScheduledThreadPoolExecutor实现
1: package kafka.utils 2: 3: /** 4:* A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor 5:* 6:* It has a pool of kafka-scheduler- threads that do the actual work. 7:* 8:* @param threads The number of threads in the thread pool 9:* @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it.10:* @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown.11:*/12: @threadsafe13: class KafkaScheduler(val threads: Int, 14: val threadNamePrefix: String = "kafka-scheduler-", 15: daemon: Boolean = true) extends Scheduler with Logging {16: @volatile private var executor: ScheduledThreadPoolExecutor = null 17: override def startup() {18: this synchronized {19: executor = new ScheduledThreadPoolExecutor(threads) //创建ScheduledThreadPoolExecutor20: executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)21: executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)22: executor.setThreadFactory(new ThreadFactory() {23: def newThread(runnable: Runnable): Thread = 24: Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)25: })26: }27: }28: 29: def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {30: val runnable = new Runnable { //将fun封装成Runnable31: def run() = {32: try {33: fun()34: } catch {...} 35: finally {...}36: }37: }38: if(period >= 0) //在pool中进行delay schedule39: executor.scheduleAtFixedRate(runnable, delay, period, unit)40: else41: executor.schedule(runnable, delay, unit)42: } 2.2 Zookeeper Client
由于Kafka是基于zookeeper进行配置管理的, 所以需要创建zkclient和zookeeper集群通信
2.3 logManager
The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
Apache Kafka源码分析 – Log Management
2.4 ReplicaManager
在0.8中新加入的replica相关模块
Apache Kafka Replication Design – High level
kafka Detailed Replication Design V3
Apache Kafka源码分析 – Controller
Apache Kafka源码分析 – ReplicaManager
2.5 Kafka Socket Server
首先broker server是socket server,所有和broker的交互都是通过往socket端口发送request来实现的
socketServer = new SocketServer(config.brokerId...) KafkaApis
该类封装了所有request的处理逻辑
/**
* Logic to handle the various Kafka requests
*/
class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager,
val zkClient: ZkClient,
val brokerId: Int,
val config: KafkaConfig,
val controller: KafkaController) extends Logging {
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
def handle(request: RequestChannel.Request) {
try{
request.requestId match {
case RequestKeys.ProduceKey => handleProducerRequest(request)
case RequestKeys.FetchKey => handleFetchRequest(request)
case RequestKeys.OffsetsKey => handleOffsetRequest(request)
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
} KafkaRequestHandler
基于线程池的KafkaRequestHandler
/**
* A thread that answers kafka requests.
*/
class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging {
def run() {
while(true) {
try {
val req = requestChannel.receiveRequest() //从socketChannel接受request
if(req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
return
}
req.requestDequeueTimeMs = SystemTime.milliseconds
apis.handle(req) //使用kafkaApis来处理request
} catch {
case e: Throwable => error("Exception when handling request", e)
}
}
}
def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
}
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
numThreads: Int) extends Logging {
val threads = new Array(numThreads) //线程池
val runnables = new Array(numThreads)
for(i
页:
[1]