设为首页 收藏本站
查看: 627|回复: 0

[经验分享] Kafka的producer

[复制链接]

尚未签到

发表于 2017-5-23 17:30:44 | 显示全部楼层 |阅读模式
  producer示例代码

Producer  producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
String messageStr = new String("Message_" + messageNo);
producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
  producer可以支持异步发送和同步发送两种方式

  config.producerType match {
case "sync" =>
case "async" =>
sync = false
producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
queue,
eventHandler,
config.queueBufferingMaxMs,
config.batchNumMessages,
config.clientId)
producerSendThread.start()
}
  客户端代码调用send方法发送数据,对于sync方法直接调用eventHandler.handle方法同步发送请求。我们先来看异步发送的方法

  /**
* Sends the data, partitioned by key to the topic using either the
* synchronous or the asynchronous producer
* @param messages the producer data object that encapsulates the topic, key and message data
*/
def send(messages: KeyedMessage[K,V]*) {
lock synchronized {
if (hasShutdown.get)
throw new ProducerClosedException
recordStats(messages)
sync match {
case true => eventHandler.handle(messages)
case false => asyncSend(messages)
}
}
}
  异步方法发送数据到queue

  private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
for (message <- messages) {
val added = config.queueEnqueueTimeoutMs match {
case 0  =>
queue.offer(message)
case _  =>
try {
config.queueEnqueueTimeoutMs < 0 match {
case true =>
queue.put(message)
true
case _ =>
queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
}
}
catch {
case e: InterruptedException =>
false
}
}
if(!added) {
producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
}else {
trace("Added to send queue an event: " + message.toString)
trace("Remaining queue size: " + queue.remainingCapacity)
}
}
}

  异步处理事件的线程ProducerSendThread,这个线程会从queue中获取事件,当事件的个数达到或者超时时,就调用   handler.handle(events)方法去处理事件

private def processEvents() {
var lastSend = SystemTime.milliseconds
var events = new ArrayBuffer[KeyedMessage[K,V]]
var full: Boolean = false
// drain the queue until you get a shutdown command
Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
.takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
currentQueueItem =>
val elapsed = (SystemTime.milliseconds - lastSend)
// check if the queue time is reached. This happens when the poll method above returns after a timeout and
// returns a null object
val expired = currentQueueItem == null
if(currentQueueItem != null) {
trace("Dequeued item for topic %s, partition key: %s, data: %s"
.format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))
events += currentQueueItem
}
// check if the batch size is reached
full = events.size >= batchSize
if(full || expired) {
if(expired)
debug(elapsed + " ms elapsed. Queue time reached. Sending..")
if(full)
debug("Batch full. Sending..")
// if either queue time has reached or batch size has reached, dispatch to event handler
tryToHandle(events)
lastSend = SystemTime.milliseconds
events = new ArrayBuffer[KeyedMessage[K,V]]
}
}
// send the last batch of events
tryToHandle(events)
if(queue.size > 0)
throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
.format(queue.size))
}
  DefaultEventHandler的handle方法处理事件

def handle(events: Seq[KeyedMessage[K,V]]) {
val serializedData = serialize(events)
serializedData.foreach {
keyed =>
val dataSize = keyed.message.payloadSize
producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize)
}
var outstandingProduceRequests = serializedData
var remainingRetries = config.messageSendMaxRetries + 1
val correlationIdStart = correlationId.get()
debug("Handling %d events".format(events.size))
while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
//如果超时了,就更新topic的meta data
if (topicMetadataRefreshInterval >= 0 &&
SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
sendPartitionPerTopicCache.clear()
topicMetadataToRefresh.clear
lastTopicMetadataRefreshTime = SystemTime.milliseconds
}
//发送数据
outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
//如果失败了,更新失败的topic的metadata
if (outstandingProduceRequests.size > 0) {
info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
// back off and update the topic metadata cache before attempting another send operation
Thread.sleep(config.retryBackoffMs)
// get topics of the outstanding produce requests and refresh metadata for those
Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
sendPartitionPerTopicCache.clear()
remainingRetries -= 1
producerStats.resendRate.mark()
}
}
if(outstandingProduceRequests.size > 0) {
producerStats.failedSendRate.mark()
val correlationIdEnd = correlationId.get()
error("Failed to send requests for topics %s with correlation ids in [%d,%d]"
.format(outstandingProduceRequests.map(_.topic).toSet.mkString(","),
correlationIdStart, correlationIdEnd-1))
throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)
}
}

private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
//将发送的消息映射成要发送到的broker,以及相应的message的map
val partitionedDataOpt = partitionAndCollate(messages)
partitionedDataOpt match {
case Some(partitionedData) =>
val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
try {
for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
if (logger.isTraceEnabled)
messagesPerBrokerMap.foreach(partitionAndEvent =>
trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
//使用java的nio发送
val failedTopicPartitions = send(brokerid, messageSetPerBroker)
failedTopicPartitions.foreach(topicPartition => {
messagesPerBrokerMap.get(topicPartition) match {
case Some(data) => failedProduceRequests.appendAll(data)
case None => // nothing
}
})
}
} catch {
case t: Throwable => error("Failed to send messages", t)
}
failedProduceRequests
case None => // all produce requests failed
messages
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379911-1-1.html 上篇帖子: kafka-web-console编译问题 下篇帖子: 忍不住要喷一下kafka
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表