xiaowei8782088 发表于 2019-1-31 11:18:39

Kafka源码深度解析-序列3 -Producer -Java NIO

@Override  
    public void poll(long timeout) throws IOException {
  
      。。。
  
      clear(); //清空各种状态
  
      if (hasStagedReceives())
  
            timeout = 0;
  
      long startSelect = time.nanoseconds();
  
      int readyKeys = select(timeout);//轮询
  
      long endSelect = time.nanoseconds();
  
      currentTimeNanos = endSelect;
  
      this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
  

  
      if (readyKeys > 0) {            Set keys = this.nioSelector.selectedKeys();
  
            Iterator iter = keys.iterator();
  
            while (iter.hasNext()) {
  
                SelectionKey key = iter.next();
  
                iter.remove();
  
                KafkaChannel channel = channel(key);
  

  
                // register all per-connection metrics at once
  
                sensors.maybeRegisterConnectionMetrics(channel.id());
  
                lruConnections.put(channel.id(), currentTimeNanos);
  

  
                try {
  
                  if (key.isConnectable()) {//有连接事件
  
                        channel.finishConnect();
  
                        this.connected.add(channel.id());
  
                        this.sensors.connectionCreated.record();
  
                  }
  

  
                  if (channel.isConnected() && !channel.ready())
  
                        channel.prepare(); //这个只有需要安全检查的SSL需求,普通的不加密的channel,prepare()为空实现
  

  
                  if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { //读就绪
  
                        NetworkReceive networkReceive;
  
                        while ((networkReceive = channel.read()) != null)
  
                            addToStagedReceives(channel, networkReceive); //实际的读动作
  
                  }
  

  
                  if (channel.ready() && key.isWritable()) {//写就绪
  
                        Send send = channel.write(); //实际的写动作
  
                        if (send != null) {
  
                            this.completedSends.add(send);
  
                            this.sensors.recordBytesSent(channel.id(), send.size());
  
                        }
  
                  }                  /* cancel any defunct sockets */
  
                  if (!key.isValid()) {
  
                        close(channel);
  
                        this.disconnected.add(channel.id());
  
                  }
  
                } catch (Exception e) {
  
                  String desc = channel.socketDescription();
  
                  if (e instanceof IOException)
  
                        log.debug("Connection with {} disconnected", desc, e);
  
                  else
  
                        log.warn("Unexpected error from {}; closing connection", desc, e);
  
                  close(channel);
  
                  this.disconnected.add(channel.id());
  
                }
  
            }
  
      }
  

  
      addToCompletedReceives();
  

  
      long endIo = time.nanoseconds();
  
      this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
  
      maybeCloseOldestConnection();
  
    }1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071


页: [1]
查看完整版本: Kafka源码深度解析-序列3 -Producer -Java NIO