设为首页 收藏本站
查看: 1405|回复: 0

[经验分享] Apache Kafka源码分析 – Broker Server

[复制链接]

尚未签到

发表于 2015-8-1 08:16:28 | 显示全部楼层 |阅读模式
  
1. Kafka.scala
  在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装


   1: val kafkaServerStartble = new KafkaServerStartable(serverConfig)   2: kafkaServerStartble.startup  




   1: package kafka.server   2: class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {   3:   private var server : KafkaServer = null   4:     5:   private def init() {   6:     server = new KafkaServer(serverConfig)   7:   }   8:     9:   def startup() {  10:     try {  11:       server.startup()  12:     }  13:     catch {...}  14:   }  15: }
2. KafkaServer
  KafkaServer代表一个kafka broker, 这是kafka的核心.
只需要看看里面startup了哪些modules, 就知道broker做了哪些工作, 后面一个个具体分析吧




   1: package kafka.server   2: /**   3:  * Represents the lifecycle of a single Kafka broker. Handles all functionality required   4:  * to start up and shutdown a single Kafka node.   5:  */   6: class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {   7:   var socketServer: SocketServer = null   8:   var requestHandlerPool: KafkaRequestHandlerPool = null   9:   var logManager: LogManager = null  10:   var kafkaHealthcheck: KafkaHealthcheck = null  11:   var topicConfigManager: TopicConfigManager = null  12:   var replicaManager: ReplicaManager = null  13:   var apis: KafkaApis = null  14:   var kafkaController: KafkaController = null  15:   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)  16:   var zkClient: ZkClient = null  17:    18:   /**  19:    * Start up API for bringing up a single instance of the Kafka server.  20:    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers  21:    */  22:   def startup() {  23:     /* start scheduler */  24:     kafkaScheduler.startup()  25:       26:     /* setup zookeeper */  27:     zkClient = initZk()  28:    29:     /* start log manager */  30:     logManager = createLogManager(zkClient)  31:     logManager.startup()  32:    33:     socketServer = new SocketServer(config.brokerId,  34:                                     config.hostName,  35:                                     config.port,  36:                                     config.numNetworkThreads,  37:                                     config.queuedMaxRequests,  38:                                     config.socketSendBufferBytes,  39:                                     config.socketReceiveBufferBytes,  40:                                     config.socketRequestMaxBytes)  41:     socketServer.startup()  42:    43:     replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)  44:     kafkaController = new KafkaController(config, zkClient)  45:       46:     /* start processing requests */  47:     apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)  48:     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)  49:      50:     replicaManager.startup()  51:    52:     kafkaController.startup()  53:       54:     topicConfigManager = new TopicConfigManager(zkClient, logManager)  55:     topicConfigManager.startup()  56:       57:     /* tell everyone we are alive */  58:     kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)  59:     kafkaHealthcheck.startup()  60:   }  2.1 KafkaScheduler
  KafkaSchduler用于在后台执行一些任务,用ScheduledThreadPoolExecutor实现




   1: package kafka.utils   2:     3: /**   4:  * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor   5:  *    6:  * It has a pool of kafka-scheduler- threads that do the actual work.   7:  *    8:  * @param threads The number of threads in the thread pool   9:  * @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it.  10:  * @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown.  11:  */  12: @threadsafe  13: class KafkaScheduler(val threads: Int,   14:                      val threadNamePrefix: String = "kafka-scheduler-",   15:                      daemon: Boolean = true) extends Scheduler with Logging {  16:   @volatile private var executor: ScheduledThreadPoolExecutor = null     17:   override def startup() {  18:     this synchronized {  19:       executor = new ScheduledThreadPoolExecutor(threads) //创建ScheduledThreadPoolExecutor  20:       executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)  21:       executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)  22:       executor.setThreadFactory(new ThreadFactory() {  23:                                   def newThread(runnable: Runnable): Thread =   24:                                     Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)  25:                                 })  26:     }  27:   }  28:    29: def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {  30:   val runnable = new Runnable { //将fun封装成Runnable  31:     def run() = {  32:       try {  33:         fun()  34:       } catch {...}   35:       finally {...}  36:     }  37:   }  38:   if(period >= 0) //在pool中进行delay schedule  39:     executor.scheduleAtFixedRate(runnable, delay, period, unit)  40:   else  41:     executor.schedule(runnable, delay, unit)  42: }  2.2 Zookeeper Client
  由于Kafka是基于zookeeper进行配置管理的, 所以需要创建zkclient和zookeeper集群通信
  2.3 logManager
  The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
Apache Kafka源码分析 – Log Management
  2.4 ReplicaManager
  在0.8中新加入的replica相关模块
  Apache Kafka Replication Design – High level
kafka Detailed Replication Design V3
  Apache Kafka源码分析 – Controller
Apache Kafka源码分析 – ReplicaManager
  2.5 Kafka Socket Server
  首先broker server是socket server,所有和broker的交互都是通过往socket端口发送request来实现的


socketServer = new SocketServer(config.brokerId...)  KafkaApis
该类封装了所有request的处理逻辑



DSC0000.gif DSC0001.gif /**
* Logic to handle the various Kafka requests
*/
class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager,
val zkClient: ZkClient,
val brokerId: Int,
val config: KafkaConfig,
val controller: KafkaController) extends Logging {
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
def handle(request: RequestChannel.Request) {
try{
request.requestId match {
case RequestKeys.ProduceKey => handleProducerRequest(request)
case RequestKeys.FetchKey => handleFetchRequest(request)
case RequestKeys.OffsetsKey => handleOffsetRequest(request)
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
}  KafkaRequestHandler
基于线程池的KafkaRequestHandler


/**
* A thread that answers kafka requests.
*/
class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging {
def run() {
while(true) {
try {
val req = requestChannel.receiveRequest() //从socketChannel接受request
if(req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
return
}
req.requestDequeueTimeMs = SystemTime.milliseconds
apis.handle(req) //使用kafkaApis来处理request
} catch {
case e: Throwable => error("Exception when handling request", e)
}
}
}
def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
}
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
numThreads: Int) extends Logging {
val threads = new Array[Thread](numThreads) //线程池
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-92848-1-1.html 上篇帖子: Apache Kafka Replication Design – High level 下篇帖子: apache kafka技术分享系列(目录索引)--转载
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表