sonyet 发表于 2019-1-31 11:20:36

Kafka源码深度解析-序列5 -Producer -RecordAccumulator队列分析

public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException {  
      appendsInProgress.incrementAndGet();      try {            if (closed)                throw new IllegalStateException("Cannot send after the producer is closed.");
  
            Deque dq = dequeFor(tp);//找到该topicPartiton对应的消息队列
  
            synchronized (dq) {
  
                RecordBatch last = dq.peekLast(); //拿出队列的最后1个元素
  
                if (last != null) {
  
                  FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); //最后一个元素, 即RecordBatch不为空,把该Record加入该RecordBatch
  
                  if (future != null)                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
  
                }
  
            }            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
  
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
  
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
  
            synchronized (dq) {                // Need to check if producer is closed again after grabbing the dequeue lock.
  
                if (closed)                  throw new IllegalStateException("Cannot send after the producer is closed.");
  
                RecordBatch last = dq.peekLast();                if (last != null) {
  
                  FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());                  if (future != null) {                        // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
  
                        free.deallocate(buffer);                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
  
                  }
  
                }                //队列里面没有RecordBatch,建一个新的,然后把Record放进去
  
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
  
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
  
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds()));
  

  
                dq.addLast(batch);
  
                incomplete.add(batch);                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
  
            }
  
      } finally {
  
            appendsInProgress.decrementAndGet();
  
      }
  
    }    private Deque dequeFor(TopicPartition tp) {
  
      Deque d = this.batches.get(tp);      if (d != null)            return d;
  
      d = new ArrayDeque();
  
      Deque previous = this.batches.putIfAbsent(tp, d);      if (previous == null)            return d;      else
  
            return previous;
  
    }123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657


页: [1]
查看完整版本: Kafka源码深度解析-序列5 -Producer -RecordAccumulator队列分析