Kafka源码深度解析-序列5 -Producer -RecordAccumulator队列分析
public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException {appendsInProgress.incrementAndGet(); try { if (closed) throw new IllegalStateException("Cannot send after the producer is closed.");
Deque dq = dequeFor(tp);//找到该topicPartiton对应的消息队列
synchronized (dq) {
RecordBatch last = dq.peekLast(); //拿出队列的最后1个元素
if (last != null) {
FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); //最后一个元素, 即RecordBatch不为空,把该Record加入该RecordBatch
if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
} int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock.
if (closed) throw new IllegalStateException("Cannot send after the producer is closed.");
RecordBatch last = dq.peekLast(); if (last != null) {
FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
free.deallocate(buffer); return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
} //队列里面没有RecordBatch,建一个新的,然后把Record放进去
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
} finally {
appendsInProgress.decrementAndGet();
}
} private Deque dequeFor(TopicPartition tp) {
Deque d = this.batches.get(tp); if (d != null) return d;
d = new ArrayDeque();
Deque previous = this.batches.putIfAbsent(tp, d); if (previous == null) return d; else
return previous;
}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
页:
[1]