设为首页 收藏本站
查看: 765|回复: 0

[经验分享] [ZooKeeper]连接中断,watch恢复,心跳和客户端超时

[复制链接]

尚未签到

发表于 2017-4-19 12:37:17 | 显示全部楼层 |阅读模式
  前一篇文章分析了server端主动超时session的情况,接下来看一下client和server网络暂时中断的情况。
  1.和server主动关闭连接一样,client抛出EndOfStreamException异常,此时客户端状态还是CONNECTED
  2.SendThread处理异常,清理连接,将当前所有请求置为失败,错误码是CONNECTIONLOSS
  3.发送Disconnected状态通知
  4.选下一个server重连
  5.连上之后发送ConnectRequest,sessionid和password是当前session的数据
  6.server端处理,分leader和follower,由于此时client端重试比较快,session还没超时,所以leader和follower端session校验成功。如果这个时候session正好超时了,则校验失败,client会抛出sessionExpired异常并退出
  7.server端返回成功的ConnectResponse
  8.client收到相应,发送SyncConnected状态通知给watcher
  9.client发送SetWatches包,重建watch

//可以通过配置禁止重建watch
if (!disableAutoWatchReset) {
//当前的所有watch
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
if (!dataWatches.isEmpty()
|| !existWatches.isEmpty() || !childWatches.isEmpty()) {
//发送重建请求
SetWatches sw = new SetWatches(lastZxid,
prependChroot(dataWatches),
prependChroot(existWatches),
prependChroot(childWatches));
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setWatches);
h.setXid(-8);
Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
outgoingQueue.addFirst(packet);
}
}
  10.server端收到setWatches请求,如果是follower,直接进入FinalRequestProcessor处理,无需proposal

case OpCode.setWatches: {
lastOp = "SETW";
SetWatches setWatches = new SetWatches();
// XXX We really should NOT need this!!!!
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
//添加watch
zks.getZKDatabase().setWatches(relativeZxid,
setWatches.getDataWatches(),
setWatches.getExistWatches(),
setWatches.getChildWatches(), cnxn);
break;
}

//添加watch的时候判断watch是否需要触发
public void setWatches(long relativeZxid, List<String> dataWatches,
List<String> existWatches, List<String> childWatches,
Watcher watcher) {
for (String path : dataWatches) {
DataNode node = getNode(path);
WatchedEvent e = null;
if (node == null) {
e = new WatchedEvent(EventType.NodeDeleted,
KeeperState.SyncConnected, path);
} else if (node.stat.getCzxid() > relativeZxid) {
e = new WatchedEvent(EventType.NodeCreated,
KeeperState.SyncConnected, path);
} else if (node.stat.getMzxid() > relativeZxid) {
e = new WatchedEvent(EventType.NodeDataChanged,
KeeperState.SyncConnected, path);
}
if (e != null) {
watcher.process(e);
} else {
this.dataWatches.addWatch(path, watcher);
}
}
for (String path : existWatches) {
DataNode node = getNode(path);
WatchedEvent e = null;
if (node == null) {
// This is the case when the watch was registered
} else if (node.stat.getMzxid() > relativeZxid) {
e = new WatchedEvent(EventType.NodeDataChanged,
KeeperState.SyncConnected, path);
} else {
e = new WatchedEvent(EventType.NodeCreated,
KeeperState.SyncConnected, path);
}
if (e != null) {
watcher.process(e);
} else {
this.dataWatches.addWatch(path, watcher);
}
}
for (String path : childWatches) {
DataNode node = getNode(path);
WatchedEvent e = null;
if (node == null) {
e = new WatchedEvent(EventType.NodeDeleted,
KeeperState.SyncConnected, path);
} else if (node.stat.getPzxid() > relativeZxid) {
e = new WatchedEvent(EventType.NodeChildrenChanged,
KeeperState.SyncConnected, path);
}
if (e != null) {
watcher.process(e);
} else {
this.childWatches.addWatch(path, watcher);
}
}
}
  11.如果是leader,则多了一层PrepRequestProcessor的处理,检查session是否还在
  再来看看客户端主动超时Session和心跳的情况,SendThread主线程

public void run() {
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
//selector的select超时时间,每次循环都会重新计算
int to;
long lastPingRwServer = System.currentTimeMillis();
while (state.isAlive()) {
try {
......
//session建立之后,to为读超时减去读空闲时间
if (state.isConnected()) {
......
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
//如果client长时间没收到server的packet,会导致读空闲时间很长,超过读超时,直接抛出异常
if (to <= 0) {
throw new SessionTimeoutException(
"Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv() + "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId));
}
//session建立之后,发送心跳
if (state.isConnected()) {
//如果写频繁,则写空闲时间很少,不用发送心跳
int timeToNextPing = readTimeout / 2
- clientCnxnSocket.getIdleSend();
//写少,发心跳
if (timeToNextPing <= 0) {
sendPing();
//上次发送时间
clientCnxnSocket.updateLastSend();
}
//写繁忙,不用发送心跳
else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
.....
//每次doTransport都会更新now,lastHeard和lastSend则取决于是否有读写请求
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
....
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
}
}
.....
}
  心跳包,xid为-2

        private void sendPing() {
lastPingSentNs = System.nanoTime();
RequestHeader h = new RequestHeader(-2, OpCode.ping);
queuePacket(h, null, null, null, null, null, null, null, null);
}
  server端处理ping包,如果是follower直接进入FinalRequestProcessor处理

case OpCode.ping: {
zks.serverStats().updateLatency(request.createTime);
lastOp = "PING";
cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
request.createTime, System.currentTimeMillis());
//心跳包的响应xid也是-2
cnxn.sendResponse(new ReplyHeader(-2,
zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");
return;
}
  如果是leader,则多了一层PrepRequestProcessor的处理,检查session是否还在
  client收到心跳包响应,啥事不做

if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x"
+ Long.toHexString(sessionId)
+ " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
+ "ms");
}
return;
}
  以上可以看出
  1.心跳包只有写空闲时才会发送
  2.每次transport的时候都会更新当前时间now
  3.lastHeard和lastSend取决于是否有读写请求
  4.客户端session超时和连接关闭CONNECTIONLOSS处理是一样的,都会导致重试

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366421-1-1.html 上篇帖子: Zookeeper注册节点的掉线自动重新注册及测试方法 下篇帖子: Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表