十二12 发表于 2015-9-6 10:39:47

zookeeper 3.4.6启动流程粗略梳理

/**  * Handles read/write IO on connection.
  */
  void doIO(SelectionKey k) throws InterruptedException {
  try {
  if (isSocketOpen() == false) {
  LOG.warn("trying to do i/o on a null socket for session:0x"
  + Long.toHexString(sessionId));
  
  return;
  }
  if (k.isReadable()) {
  int rc = sock.read(incomingBuffer);
  if (rc < 0) {
  throw new EndOfStreamException(
  "Unable to read additional data from client sessionid 0x"
  + Long.toHexString(sessionId)
  + ", likely client has closed socket");
  }
  if (incomingBuffer.remaining() == 0) {
  boolean isPayload;
  if (incomingBuffer == lenBuffer) { // start of next request
  incomingBuffer.flip();
  isPayload = readLength(k);
  incomingBuffer.clear();
  } else {
  // continuation
  isPayload = true;
  }
  if (isPayload) { // not the case for 4letterword
  readPayload();
  }
  else {
  // four letter words take care
  // need not do anything else
  return;
  }
  }
  }
  if (k.isWritable()) {
  // ZooLog.logTraceMessage(LOG,
  // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK
  // "outgoingBuffers.size() = " +
  // outgoingBuffers.size());
  if (outgoingBuffers.size() > 0) {
  // ZooLog.logTraceMessage(LOG,
  // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
  // "sk " + k + " is valid: " +
  // k.isValid());
  
  /*
  * This is going to reset the buffer position to 0 and the

  * limit to the>  * with data from the non-direct buffers that we need to
  * send.
  */
  ByteBuffer directBuffer = factory.directBuffer;
  directBuffer.clear();
  
  for (ByteBuffer b : outgoingBuffers) {
  if (directBuffer.remaining() < b.remaining()) {
  /*
  * When we call put later, if the directBuffer is to
  * small to hold everything, nothing will be copied,
  * so we've got to slice the buffer if it's too big.
  */
  b = (ByteBuffer) b.slice().limit(
  directBuffer.remaining());
  }
  /*
  * put() is going to modify the positions of both
  * buffers, put we don't want to change the position of
  * the source buffers (we'll do that after the send, if
  * needed), so we save and reset the position after the
  * copy
  */
  int p = b.position();
  directBuffer.put(b);
  b.position(p);
  if (directBuffer.remaining() == 0) {
  break;
  }
  }
  /*
  * Do the flip: limit becomes position, position gets set to
  * 0. This sets us up for the write.
  */
  directBuffer.flip();
  
  int sent = sock.write(directBuffer);
  ByteBuffer bb;
  
  // Remove the buffers that we have sent
  while (outgoingBuffers.size() > 0) {
  bb = outgoingBuffers.peek();
  if (bb == ServerCnxnFactory.closeConn) {
  throw new CloseRequestException("close requested");
  }
  int left = bb.remaining() - sent;
  if (left > 0) {
  /*
  * We only partially sent this buffer, so we update
  * the position and exit the loop.
  */
  bb.position(bb.position() + sent);
  break;
  }
  packetSent();
  /* We've sent the whole buffer, so drop the buffer */
  sent -= bb.remaining();
  outgoingBuffers.remove();
  }
  // ZooLog.logTraceMessage(LOG,
  // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
  // outgoingBuffers.size() = " + outgoingBuffers.size());
  }
  
  synchronized(this.factory){
  if (outgoingBuffers.size() == 0) {
  if (!initialized
  && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
  throw new CloseRequestException("responded to info probe");
  }
  sk.interestOps(sk.interestOps()
  & (~SelectionKey.OP_WRITE));
  } else {
  sk.interestOps(sk.interestOps()
  | SelectionKey.OP_WRITE);
  }
  }
  }
  } catch (CancelledKeyException e) {
  LOG.warn("Exception causing close of session 0x"
  + Long.toHexString(sessionId)
  + " due to " + e);
  if (LOG.isDebugEnabled()) {
  LOG.debug("CancelledKeyException stack trace", e);
  }
  close();
  } catch (CloseRequestException e) {
  // expecting close to log session closure
  close();
  } catch (EndOfStreamException e) {
  LOG.warn("caught end of stream exception",e); // tell user why
  
  // expecting close to log session closure
  close();
  } catch (IOException e) {
  LOG.warn("Exception causing close of session 0x"
  + Long.toHexString(sessionId)
  + " due to " + e);
  if (LOG.isDebugEnabled()) {
  LOG.debug("IOException stack trace", e);
  }
  close();
  }
  }
页: [1]
查看完整版本: zookeeper 3.4.6启动流程粗略梳理