快乐的老鼠 发表于 2019-1-31 11:14:50

Kafka源码深度解析-序列4 -Producer -network层核心原理

public void poll(long timeout) throws IOException {  
      if (timeout < 0)
  
            throw new IllegalArgumentException(&quot;timeout should be >= 0&quot;);
  
      clear();
  
      if (hasStagedReceives())
  
            timeout = 0;
  
      /* check ready keys */
  
      long startSelect = time.nanoseconds();
  
      int readyKeys = select(timeout);
  
      long endSelect = time.nanoseconds();
  
      currentTimeNanos = endSelect;
  
      this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
  

  
      if (readyKeys > 0) {            Set keys = this.nioSelector.selectedKeys();
  
            Iterator iter = keys.iterator();
  
            while (iter.hasNext()) {
  
                SelectionKey key = iter.next();
  
                iter.remove();
  
                KafkaChannel channel = channel(key);
  

  
                // register all per-connection metrics at once
  
                sensors.maybeRegisterConnectionMetrics(channel.id());
  
                lruConnections.put(channel.id(), currentTimeNanos);
  

  
                try {                  /* complete any connections that have finished their handshake */
  
                  if (key.isConnectable()) {
  
                        channel.finishConnect();
  
                        this.connected.add(channel.id());
  
                        this.sensors.connectionCreated.record();
  
                  }                  /* if channel is not ready finish prepare */
  
                  if (channel.isConnected() && !channel.ready())
  
                        channel.prepare();
  

  
                  /* if channel is ready read from any connections that have readable data */
  
                  if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
  
                        NetworkReceive networkReceive;
  
                        while ((networkReceive = channel.read()) != null)
  
                            addToStagedReceives(channel, networkReceive);
  
                  }                  /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
  
                  if (channel.ready() && key.isWritable()) {
  
                        Send send = channel.write();
  
                        if (send != null) {
  
                            this.completedSends.add(send);
  
                            this.sensors.recordBytesSent(channel.id(), send.size());
  
                        }
  
                  }
  

  
                  if (!key.isValid()) {   //手段2
  
                        close(channel);
  
                        this.disconnected.add(channel.id());
  
                  }
  
                } catch (Exception e) {//手段1:任何一个io函数,只要抛错,就认为连接断了
  
                  String desc = channel.socketDescription();
  
                  if (e instanceof IOException)
  
                        log.debug(&quot;Connection with {} disconnected&quot;, desc, e);
  
                  else
  
                        log.warn(&quot;Unexpected error from {}; closing connection&quot;, desc, e);
  
                  close(channel);
  
                  this.disconnected.add(channel.id());
  
                }
  
            }
  
      }
  

  
      addToCompletedReceives();
  

  
      long endIo = time.nanoseconds();
  
      this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
  
      maybeCloseOldestConnection();
  
    }123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475


页: [1]
查看完整版本: Kafka源码深度解析-序列4 -Producer -network层核心原理