|
1. Kafka.scala
在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装
1: val kafkaServerStartble = new KafkaServerStartable(serverConfig) 2: kafkaServerStartble.startup
1: package kafka.server 2: class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { 3: private var server : KafkaServer = null 4: 5: private def init() { 6: server = new KafkaServer(serverConfig) 7: } 8: 9: def startup() { 10: try { 11: server.startup() 12: } 13: catch {...} 14: } 15: }
2. KafkaServer
KafkaServer代表一个kafka broker, 这是kafka的核心.
只需要看看里面startup了哪些modules, 就知道broker做了哪些工作, 后面一个个具体分析吧
1: package kafka.server 2: /** 3: * Represents the lifecycle of a single Kafka broker. Handles all functionality required 4: * to start up and shutdown a single Kafka node. 5: */ 6: class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging { 7: var socketServer: SocketServer = null 8: var requestHandlerPool: KafkaRequestHandlerPool = null 9: var logManager: LogManager = null 10: var kafkaHealthcheck: KafkaHealthcheck = null 11: var topicConfigManager: TopicConfigManager = null 12: var replicaManager: ReplicaManager = null 13: var apis: KafkaApis = null 14: var kafkaController: KafkaController = null 15: val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) 16: var zkClient: ZkClient = null 17: 18: /** 19: * Start up API for bringing up a single instance of the Kafka server. 20: * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers 21: */ 22: def startup() { 23: /* start scheduler */ 24: kafkaScheduler.startup() 25: 26: /* setup zookeeper */ 27: zkClient = initZk() 28: 29: /* start log manager */ 30: logManager = createLogManager(zkClient) 31: logManager.startup() 32: 33: socketServer = new SocketServer(config.brokerId, 34: config.hostName, 35: config.port, 36: config.numNetworkThreads, 37: config.queuedMaxRequests, 38: config.socketSendBufferBytes, 39: config.socketReceiveBufferBytes, 40: config.socketRequestMaxBytes) 41: socketServer.startup() 42: 43: replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) 44: kafkaController = new KafkaController(config, zkClient) 45: 46: /* start processing requests */ 47: apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController) 48: requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) 49: 50: replicaManager.startup() 51: 52: kafkaController.startup() 53: 54: topicConfigManager = new TopicConfigManager(zkClient, logManager) 55: topicConfigManager.startup() 56: 57: /* tell everyone we are alive */ 58: kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) 59: kafkaHealthcheck.startup() 60: } 2.1 KafkaScheduler
KafkaSchduler用于在后台执行一些任务,用ScheduledThreadPoolExecutor实现
1: package kafka.utils 2: 3: /** 4: * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor 5: * 6: * It has a pool of kafka-scheduler- threads that do the actual work. 7: * 8: * @param threads The number of threads in the thread pool 9: * @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it. 10: * @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown. 11: */ 12: @threadsafe 13: class KafkaScheduler(val threads: Int, 14: val threadNamePrefix: String = "kafka-scheduler-", 15: daemon: Boolean = true) extends Scheduler with Logging { 16: @volatile private var executor: ScheduledThreadPoolExecutor = null 17: override def startup() { 18: this synchronized { 19: executor = new ScheduledThreadPoolExecutor(threads) //创建ScheduledThreadPoolExecutor 20: executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) 21: executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false) 22: executor.setThreadFactory(new ThreadFactory() { 23: def newThread(runnable: Runnable): Thread = 24: Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon) 25: }) 26: } 27: } 28: 29: def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = { 30: val runnable = new Runnable { //将fun封装成Runnable 31: def run() = { 32: try { 33: fun() 34: } catch {...} 35: finally {...} 36: } 37: } 38: if(period >= 0) //在pool中进行delay schedule 39: executor.scheduleAtFixedRate(runnable, delay, period, unit) 40: else 41: executor.schedule(runnable, delay, unit) 42: } 2.2 Zookeeper Client
由于Kafka是基于zookeeper进行配置管理的, 所以需要创建zkclient和zookeeper集群通信
2.3 logManager
The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
Apache Kafka源码分析 – Log Management
2.4 ReplicaManager
在0.8中新加入的replica相关模块
Apache Kafka Replication Design – High level
kafka Detailed Replication Design V3
Apache Kafka源码分析 – Controller
Apache Kafka源码分析 – ReplicaManager
2.5 Kafka Socket Server
首先broker server是socket server,所有和broker的交互都是通过往socket端口发送request来实现的
socketServer = new SocketServer(config.brokerId...) KafkaApis
该类封装了所有request的处理逻辑
/**
* Logic to handle the various Kafka requests
*/
class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager,
val zkClient: ZkClient,
val brokerId: Int,
val config: KafkaConfig,
val controller: KafkaController) extends Logging {
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
def handle(request: RequestChannel.Request) {
try{
request.requestId match {
case RequestKeys.ProduceKey => handleProducerRequest(request)
case RequestKeys.FetchKey => handleFetchRequest(request)
case RequestKeys.OffsetsKey => handleOffsetRequest(request)
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
} KafkaRequestHandler
基于线程池的KafkaRequestHandler
/**
* A thread that answers kafka requests.
*/
class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging {
def run() {
while(true) {
try {
val req = requestChannel.receiveRequest() //从socketChannel接受request
if(req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
return
}
req.requestDequeueTimeMs = SystemTime.milliseconds
apis.handle(req) //使用kafkaApis来处理request
} catch {
case e: Throwable => error("Exception when handling request", e)
}
}
}
def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
}
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
numThreads: Int) extends Logging {
val threads = new Array[Thread](numThreads) //线程池
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i |
|