设为首页 收藏本站
查看: 896|回复: 0

[经验分享] zookeeper 3.4.6启动流程粗略梳理

[复制链接]

尚未签到

发表于 2015-9-6 10:39:47 | 显示全部楼层 |阅读模式
/**  * Handles read/write IO on connection.
  */
  void doIO(SelectionKey k) throws InterruptedException {
  try {
  if (isSocketOpen() == false) {
  LOG.warn("trying to do i/o on a null socket for session:0x"
  + Long.toHexString(sessionId));
  
  return;
  }
  if (k.isReadable()) {
  int rc = sock.read(incomingBuffer);
  if (rc < 0) {
  throw new EndOfStreamException(
  "Unable to read additional data from client sessionid 0x"
  + Long.toHexString(sessionId)
  + ", likely client has closed socket");
  }
  if (incomingBuffer.remaining() == 0) {
  boolean isPayload;
  if (incomingBuffer == lenBuffer) { // start of next request
  incomingBuffer.flip();
  isPayload = readLength(k);
  incomingBuffer.clear();
  } else {
  // continuation
  isPayload = true;
  }
  if (isPayload) { // not the case for 4letterword
  readPayload();
  }
  else {
  // four letter words take care
  // need not do anything else
  return;
  }
  }
  }
  if (k.isWritable()) {
  // ZooLog.logTraceMessage(LOG,
  // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK
  // "outgoingBuffers.size() = " +
  // outgoingBuffers.size());
  if (outgoingBuffers.size() > 0) {
  // ZooLog.logTraceMessage(LOG,
  // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
  // "sk " + k + " is valid: " +
  // k.isValid());
  
  /*
  * This is going to reset the buffer position to 0 and the

  * limit to the>  * with data from the non-direct buffers that we need to
  * send.
  */
  ByteBuffer directBuffer = factory.directBuffer;
  directBuffer.clear();
  
  for (ByteBuffer b : outgoingBuffers) {
  if (directBuffer.remaining() < b.remaining()) {
  /*
  * When we call put later, if the directBuffer is to
  * small to hold everything, nothing will be copied,
  * so we've got to slice the buffer if it's too big.
  */
  b = (ByteBuffer) b.slice().limit(
  directBuffer.remaining());
  }
  /*
  * put() is going to modify the positions of both
  * buffers, put we don't want to change the position of
  * the source buffers (we'll do that after the send, if
  * needed), so we save and reset the position after the
  * copy
  */
  int p = b.position();
  directBuffer.put(b);
  b.position(p);
  if (directBuffer.remaining() == 0) {
  break;
  }
  }
  /*
  * Do the flip: limit becomes position, position gets set to
  * 0. This sets us up for the write.
  */
  directBuffer.flip();
  
  int sent = sock.write(directBuffer);
  ByteBuffer bb;
  
  // Remove the buffers that we have sent
  while (outgoingBuffers.size() > 0) {
  bb = outgoingBuffers.peek();
  if (bb == ServerCnxnFactory.closeConn) {
  throw new CloseRequestException("close requested");
  }
  int left = bb.remaining() - sent;
  if (left > 0) {
  /*
  * We only partially sent this buffer, so we update
  * the position and exit the loop.
  */
  bb.position(bb.position() + sent);
  break;
  }
  packetSent();
  /* We've sent the whole buffer, so drop the buffer */
  sent -= bb.remaining();
  outgoingBuffers.remove();
  }
  // ZooLog.logTraceMessage(LOG,
  // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
  // outgoingBuffers.size() = " + outgoingBuffers.size());
  }
  
  synchronized(this.factory){
  if (outgoingBuffers.size() == 0) {
  if (!initialized
  && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
  throw new CloseRequestException("responded to info probe");
  }
  sk.interestOps(sk.interestOps()
  & (~SelectionKey.OP_WRITE));
  } else {
  sk.interestOps(sk.interestOps()
  | SelectionKey.OP_WRITE);
  }
  }
  }
  } catch (CancelledKeyException e) {
  LOG.warn("Exception causing close of session 0x"
  + Long.toHexString(sessionId)
  + " due to " + e);
  if (LOG.isDebugEnabled()) {
  LOG.debug("CancelledKeyException stack trace", e);
  }
  close();
  } catch (CloseRequestException e) {
  // expecting close to log session closure
  close();
  } catch (EndOfStreamException e) {
  LOG.warn("caught end of stream exception",e); // tell user why
  
  // expecting close to log session closure
  close();
  } catch (IOException e) {
  LOG.warn("Exception causing close of session 0x"
  + Long.toHexString(sessionId)
  + " due to " + e);
  if (LOG.isDebugEnabled()) {
  LOG.debug("IOException stack trace", e);
  }
  close();
  }
  }

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-110044-1-1.html 上篇帖子: Paxos与zookeeper 下篇帖子: HBase ZooKeeper安装
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表