kaola4549 发表于 2015-8-1 08:16:28

Apache Kafka源码分析 – Broker Server

  
1. Kafka.scala
  在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装


   1: val kafkaServerStartble = new KafkaServerStartable(serverConfig)   2: kafkaServerStartble.startup  




   1: package kafka.server   2: class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {   3:   private var server : KafkaServer = null   4:   5:   private def init() {   6:   server = new KafkaServer(serverConfig)   7:   }   8:   9:   def startup() {10:   try {11:       server.startup()12:   }13:   catch {...}14:   }15: }
2. KafkaServer
  KafkaServer代表一个kafka broker, 这是kafka的核心.
只需要看看里面startup了哪些modules, 就知道broker做了哪些工作, 后面一个个具体分析吧




   1: package kafka.server   2: /**   3:* Represents the lifecycle of a single Kafka broker. Handles all functionality required   4:* to start up and shutdown a single Kafka node.   5:*/   6: class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {   7:   var socketServer: SocketServer = null   8:   var requestHandlerPool: KafkaRequestHandlerPool = null   9:   var logManager: LogManager = null10:   var kafkaHealthcheck: KafkaHealthcheck = null11:   var topicConfigManager: TopicConfigManager = null12:   var replicaManager: ReplicaManager = null13:   var apis: KafkaApis = null14:   var kafkaController: KafkaController = null15:   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)16:   var zkClient: ZkClient = null17:    18:   /**19:    * Start up API for bringing up a single instance of the Kafka server.20:    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers21:    */22:   def startup() {23:   /* start scheduler */24:   kafkaScheduler.startup()25:       26:   /* setup zookeeper */27:   zkClient = initZk()28:    29:   /* start log manager */30:   logManager = createLogManager(zkClient)31:   logManager.startup()32:    33:   socketServer = new SocketServer(config.brokerId,34:                                     config.hostName,35:                                     config.port,36:                                     config.numNetworkThreads,37:                                     config.queuedMaxRequests,38:                                     config.socketSendBufferBytes,39:                                     config.socketReceiveBufferBytes,40:                                     config.socketRequestMaxBytes)41:   socketServer.startup()42:    43:   replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)44:   kafkaController = new KafkaController(config, zkClient)45:       46:   /* start processing requests */47:   apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)48:   requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)49:      50:   replicaManager.startup()51:    52:   kafkaController.startup()53:       54:   topicConfigManager = new TopicConfigManager(zkClient, logManager)55:   topicConfigManager.startup()56:       57:   /* tell everyone we are alive */58:   kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)59:   kafkaHealthcheck.startup()60:   }  2.1 KafkaScheduler
  KafkaSchduler用于在后台执行一些任务,用ScheduledThreadPoolExecutor实现




   1: package kafka.utils   2:   3: /**   4:* A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor   5:*    6:* It has a pool of kafka-scheduler- threads that do the actual work.   7:*    8:* @param threads The number of threads in the thread pool   9:* @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it.10:* @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown.11:*/12: @threadsafe13: class KafkaScheduler(val threads: Int,   14:                      val threadNamePrefix: String = "kafka-scheduler-",   15:                      daemon: Boolean = true) extends Scheduler with Logging {16:   @volatile private var executor: ScheduledThreadPoolExecutor = null   17:   override def startup() {18:   this synchronized {19:       executor = new ScheduledThreadPoolExecutor(threads) //创建ScheduledThreadPoolExecutor20:       executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)21:       executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)22:       executor.setThreadFactory(new ThreadFactory() {23:                                 def newThread(runnable: Runnable): Thread =   24:                                     Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)25:                                 })26:   }27:   }28:    29: def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {30:   val runnable = new Runnable { //将fun封装成Runnable31:   def run() = {32:       try {33:         fun()34:       } catch {...}   35:       finally {...}36:   }37:   }38:   if(period >= 0) //在pool中进行delay schedule39:   executor.scheduleAtFixedRate(runnable, delay, period, unit)40:   else41:   executor.schedule(runnable, delay, unit)42: }  2.2 Zookeeper Client
  由于Kafka是基于zookeeper进行配置管理的, 所以需要创建zkclient和zookeeper集群通信
  2.3 logManager
  The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
Apache Kafka源码分析 – Log Management
  2.4 ReplicaManager
  在0.8中新加入的replica相关模块
  Apache Kafka Replication Design – High level
kafka Detailed Replication Design V3
  Apache Kafka源码分析 – Controller
Apache Kafka源码分析 – ReplicaManager
  2.5 Kafka Socket Server
  首先broker server是socket server,所有和broker的交互都是通过往socket端口发送request来实现的


socketServer = new SocketServer(config.brokerId...)  KafkaApis
该类封装了所有request的处理逻辑


/**
* Logic to handle the various Kafka requests
*/
class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager,
val zkClient: ZkClient,
val brokerId: Int,
val config: KafkaConfig,
val controller: KafkaController) extends Logging {
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
def handle(request: RequestChannel.Request) {
try{
request.requestId match {
case RequestKeys.ProduceKey => handleProducerRequest(request)
case RequestKeys.FetchKey => handleFetchRequest(request)
case RequestKeys.OffsetsKey => handleOffsetRequest(request)
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
}  KafkaRequestHandler
基于线程池的KafkaRequestHandler


/**
* A thread that answers kafka requests.
*/
class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging {
def run() {
while(true) {
try {
val req = requestChannel.receiveRequest() //从socketChannel接受request
if(req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
return
}
req.requestDequeueTimeMs = SystemTime.milliseconds
apis.handle(req) //使用kafkaApis来处理request
} catch {
case e: Throwable => error("Exception when handling request", e)
}
}
}
def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
}
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
numThreads: Int) extends Logging {
val threads = new Array(numThreads) //线程池
val runnables = new Array(numThreads)
for(i
页: [1]
查看完整版本: Apache Kafka源码分析 – Broker Server