设为首页 收藏本站
查看: 1061|回复: 0

[经验分享] kafka producer服务端

[复制链接]

尚未签到

发表于 2017-5-23 18:06:25 | 显示全部楼层 |阅读模式
 
producer服务端:
1.nio接受请求 http://blackproof.iteye.com/blog/2239949
 
2.handler从请求队列中获取,调用KafkaApis http://blackproof.iteye.com/blog/2239953
 
3.KafkaApis类,调用handleProducerOrOffsetCommitRequest方法:

  def handle(request: RequestChannel.Request) {
try{
trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
request.requestId match {
case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
case RequestKeys.FetchKey => handleFetchRequest(request)
case RequestKeys.OffsetsKey => handleOffsetRequest(request)
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //设置leader或flower,flower启动对应的replica-fetch msg中的线程类
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
request.requestObj.handleError(e, requestChannel, request)
error("error when handling request %s".format(request.requestObj), e)
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
 
 3.1 将数据插入到本地log,默认本地为leader,客户端发送到tplog的leader的broker里
         appendToLocalLog是主要的处理类
 3.2 按照客户produer设置的ack级别,处理如何返回客户端
           0,不做任何返回,直接wake处理之后的请求
           1,获取leader的result,并返回
           -1,判断leader外的isr队列中的replica的lastoffset是否大于等于当前的offset,并获取错误信息
                   如果未满足且没有错误信息,则设置watcher
                   如果超时则放入到delay操作的队列中
 

  def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
......
val sTime = SystemTime.milliseconds
//将数据插入到本地log(默认本地为leader)
val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)//如果是offset请求:true;producerequest:false
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)
//获得结果,是否有错误信息(throw error)
val numPartitionsInError = localProduceResults.count(_.error.isDefined)
if(produceRequest.requiredAcks == 0) {
//当acks基本为0,则无需任务响应,直接返回执行成功
// no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since
// no response is expected by the producer the handler will send a close connection response to the socket server
// to close the socket so that the producer client will know that some exception has happened and will refresh its metadata
if (numPartitionsInError != 0) {
info(("Send the close connection response due to error handling produce request " +
"[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")
.format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))
requestChannel.closeConnection(request.processor, request)
} else {
if (firstErrorCode == ErrorMapping.NoError)
offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo))
//offset,producer两种请求
if (offsetCommitRequestOpt.isDefined) {
val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
} else
requestChannel.noOperation(request.processor, request)
}
} else if (produceRequest.requiredAcks == 1 ||
produceRequest.numPartitions <= 0 ||
numPartitionsInError == produceRequest.numPartitions) {
//需要leader确认请求,才返回执行成功
if (firstErrorCode == ErrorMapping.NoError) {
//offsetsCache 更新offsetmanager的offset内存
offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
}
val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap
val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))
.getOrElse(ProducerResponse(produceRequest.correlationId, statuses))
//返回处理之后的response,包含produceresult信息ProducerResponseStatus
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
} else {
//需要所有replica都受到请求,才返回成功
// create a list of (topic, partition) pairs to use as keys for this delayed request
val producerRequestKeys = produceRequest.data.keys.toSeq
val statuses = localProduceResults.map(r =>
r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
val delayedRequest =  new DelayedProduce(
producerRequestKeys,
request,
produceRequest.ackTimeoutMs.toLong,
produceRequest,
statuses,
offsetCommitRequestOpt)
//查看其它replication是否都完成,如果没完成则设置watcher,如果超时则放入队列中(watcher功能)
// add the produce request for watch if it's not satisfied, otherwise send the response back
val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)
if (satisfiedByMe)
producerRequestPurgatory.respond(delayedRequest)
}
}
 appendToLocalLog获得本地tp的partition类,调用partition的appendMessagesToLeader方法
 

val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
val info = partitionOpt match {
case Some(partition) =>
partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks) //将数据发送给leader
case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
.format(topicAndPartition, brokerId))
}
 3.1
appendMessagesToLeader

          ......
//检查isr之后,进行真正往log里写的方法
val info = log.append(messages, assignOffsets = true)
// probably unblock some follower fetch requests since log end offset has been updated
// 检查当前tp是否有watcher,如果有满足replica都同步的watcher,则返回watcher的response,返回给channel response,返回响应
replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId))
// we may need to increment high watermark since ISR could be down to 1
//因为数据多了,提升HighWatermark,用于判断当前leader和其他replica的offset做比较,
maybeIncrementLeaderHW(leaderReplica)
......
3.1.1.log.append方法:
检查是否需要segment生成新文件,数据入segment,更新lastoffset

        // maybe roll the log if this segment is full,获取当前的segment,检查是否需要segment的flush
val segment = maybeRoll(validMessages.sizeInBytes)
// now append to the log 添加到segment的file中,如果超过index文件的间隔,写到index文件里;
// index使用channel map,log使用GatheringByteChannel自带的数组缓存池(java本身的)
segment.append(appendInfo.firstOffset, validMessages)
// increment the log end offset
updateLogEndOffset(appendInfo.lastOffset + 1)
 segment.append方法:
数据入file channel流里,判断是否如index中

  def append(offset: Long, messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
// append an entry to the index (if needed)
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
//如果上次进行建立索引的index到当前的index(bytesSinceLastIndexEntry)大于需要建索引的间隔 =》 满足建索引的要求,则建立索引
index.append(offset, log.sizeInBytes())
this.bytesSinceLastIndexEntry = 0
}
// append the messages
log.append(messages)
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
}
   3.1.2 unblockDelayedFetchRequests
  检查当前tp是否有watcher,如果有满足replica都同步的watcher,则返回watcher的response,返回给channel response,返回响应

  def unblockDelayedFetchRequests(key: TopicAndPartition) {
val satisfied = fetchRequestPurgatory.update(key)
debug("Request key %s unblocked %d fetch requests.".format(key, satisfied.size))
// send any newly unblocked responses
satisfied.foreach(fetchRequestPurgatory.respond(_))
}
 3.1.3 maybeIncrementLeaderHW

  private def maybeIncrementLeaderHW(leaderReplica: Replica) {
val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
//message offset相减,获得最小的offset(最迟更新的)
val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
val oldHighWatermark = leaderReplica.highWatermark
if(oldHighWatermark.precedes(newHighWatermark)) {//如果最迟的offset都比leader大(早),则更新highWatermark
leaderReplica.highWatermark = newHighWatermark
debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
// some delayed requests may be unblocked after HW changed
val requestKey = new TopicAndPartition(this.topic, this.partitionId)
replicaManager.unblockDelayedFetchRequests(requestKey)
replicaManager.unblockDelayedProduceRequests(requestKey)
} else {
debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s"
.format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
}
}
 
 
 
 
 
 
 
 
 
 
 
 
 



 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379956-1-1.html 上篇帖子: Kafka 测试环境宕机原因查询(二) 下篇帖子: kafka_2.9.2-0.8.1.1伪分布式集群安装
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表