public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException {
appendsInProgress.incrementAndGet(); try { if (closed) throw new IllegalStateException("Cannot send after the producer is closed.");
synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock.
if (closed) throw new IllegalStateException("Cannot send after the producer is closed.");
RecordBatch last = dq.peekLast(); if (last != null) {
FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
free.deallocate(buffer); return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
} //队列里面没有RecordBatch,建一个新的,然后把Record放进去
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());