设为首页 收藏本站
查看: 839|回复: 0

[经验分享] kafka client端 producer

[复制链接]

尚未签到

发表于 2017-5-23 17:51:56 | 显示全部楼层 |阅读模式
  kafka producer客户端
  KafkaProducer的send方法:

1.等待kafka要发送的topic的partition都在线
2.序列化key,value;
key:org.apache.kafka.common.serialization.IntegerSerializer
value:org.apache.kafka.common.serialization.StringSerializer
3.根据发送数据计算索要发送的topic的partition
使用record记录中的partition,若为空,用paritition类计算
partition:org.apache.kafka.clients.producer.internals.DefaultPartitioner
4.确保所要发送的信息的序列化大小不超过阈值
阈值:MAX_REQUEST_SIZE_CONFIG = "max.request.size"
BUFFER_MEMORY_CONFIG = "buffer.memory"
5.实例化topic的partition,实例化发送对象result,添加accumulator中的topic队列中
封装为writable records,包含compresor压缩,再封装为batch
压缩参数:COMPRESSION_TYPE_CONFIG = "compression.type";
6.查看result的batch是否或是新建的,则唤醒sener发送消息
7.返回result的future

  @Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
try {
// @1 first make sure the metadata for the topic is available
waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
byte[] serializedKey;
try {
// @2 序列化key
serializedKey = keySerializer.serialize(record.topic(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer");
}
byte[] serializedValue;
try {
//序列化value
serializedValue = valueSerializer.serialize(record.topic(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
//@3 计算partition
int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);//@4 确保发送请求不超过阈值
TopicPartition tp = new TopicPartition(record.topic(), partition);
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
//@5 发送封装好的对象
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback);
if (result.batchIsFull || result.newBatchCreated) {//@6
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;//@7
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
throw e;
}
}
  其中第5步,添加到accumulator代码如下:
  将record添加到topic的partition队列中,如果存在则添加;
  如果不存在则创建队列,二次检查队列是否有值,如果有,则将record添加
  如果没有,则封装writable records,包含compresor压缩
  和batch类;
  record.append的时候调用compressor进行压缩
  存在与否都将当record添加到队列中,并且进行压缩(如果配置压缩)

   public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
try {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
// check if we have an in-progress batch
Deque<RecordBatch> dq = dequeFor(tp);
synchronized (dq) {//添加到已经存在的topic队列中
RecordBatch last = dq.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(key, value, callback);
if (future != null)
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
}
// we don't have an in-progress record batch try to allocate a new batch
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
ByteBuffer buffer = free.allocate(size);
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
//重复判断其他producer有没有放到dp中
RecordBatch last = dq.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(key, value, callback);
if (future != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
free.deallocate(buffer);
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
}
//没有当前topic的数据
//封装writable records,包含compresor压缩
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());//封装为recordBatch
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
dq.addLast(batch);
incomplete.add(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
} finally {
appendsInProgress.decrementAndGet();
}
}
  MemoryRecords

    public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) {
if (!this.records.hasRoomFor(key, value)) {//mem溢出
return null;
} else {
this.records.append(0L, key, value);//添加到record中,又进行压缩操作
this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
if (callback != null)
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}
  在第6步,sender类发送消息run方法:
  1.检查record是否准备好:

The record set is full
The record set has sat in the accumulator for at least lingerMs milliseconds
The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are immediately considered ready).
The accumulator has been closed

  2.获取accumulator中所有数据
  3. 生成request中
  4.填入selector的client中
  5.client selector nio发送数据

public void run(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send 获得数据的leader
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
if (result.unknownLeadersExist)
this.metadata.requestUpdate();
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// create produce requests
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
//only for debug test
//        if(batches.size()>=1){
//        System.out.println(batches.size());
//        }
sensors.updateProduceRequestMetrics(batches);
List<ClientRequest> requests = createProduceRequests(batches, now);//生成request
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}
for (ClientRequest request : requests)
client.send(request);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
this.client.poll(pollTimeout, now);//nio 发送数据
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379939-1-1.html 上篇帖子: (转)Kafka部署与代码实例 下篇帖子: kafka windows环境的搭建之路
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表