设为首页 收藏本站
查看: 1146|回复: 0

[经验分享] [ZooKeeper]Client Session失效

[复制链接]

尚未签到

发表于 2017-4-19 10:08:17 | 显示全部楼层 |阅读模式
  前面一篇文章提到zookeeper server端主动发现session超时并清理session信息,关闭连接,接下来看看client端如何试图恢复session的。关于client端代码分析见前文http://iwinit.iteye.com/blog/1754611 。
由于session被清理,此时server端已经没有session信息了。而由于连接被关闭,client会抛出异常

if (sockKey.isReadable()) {
//返回-1
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
  SendThread处理异常

else {
// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
LOG.info(e.getMessage() + ", closing socket connection");
} else if (e instanceof SessionTimeoutException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
}
//连接被关闭了
else if (e instanceof EndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof RWServerFoundException) {
LOG.info(e.getMessage());
} else {
LOG.warn(
"Session 0x"
+ Long.toHexString(getSessionId())
+ " for server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", unexpected error"
+ RETRY_CONN_MSG, e);
}
//清理连接,失败当前请求
cleanup();
//此时state还是CONNECTED,发送DISCONNECTED状态通知
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
  接下来client重新寻找下一个server进行session恢复,此时client的sessionId和password仍然是上一次创建的session信息。

//此处返回true,因为连接已经被置为null了
if (!clientCnxnSocket.isConnected()) {
//重连时,先sleep一下
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
//重新开始连接
startConnect();
clientCnxnSocket.updateLastSendAndHeard();
}
  取下一个server

addr = hostProvider.next(1000);
  之后就和新建session时类似,区别是发送ConnectRequest时sessionid和password都是老的

//新建session成功时,seenRwServerBefore已经被置为true
long sessId = (seenRwServerBefore) ? sessionId : 0;
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
  接下来server端处理

//此时sessionId不为0
long sessionId = connReq.getSessionId();
if (sessionId != 0) {
long clientSessionId = connReq.getSessionId();
LOG.info("Client attempting to renew session 0x"
+ Long.toHexString(clientSessionId)
+ " at " + cnxn.getRemoteSocketAddress());
//先关闭老的连接,如果有的话,删除watch
serverCnxnFactory.closeSession(sessionId);
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
} else {
LOG.info("Client attempting to establish new session at "
+ cnxn.getRemoteSocketAddress());
createSession(cnxn, passwd, sessionTimeout);
}
  重启session

    public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
int sessionTimeout) throws IOException {
//检查密码,如果不一样,则结束session,返回client一个为0的sessionid。如果sessionid为0,则为false
if (!checkPasswd(sessionId, passwd)) {
finishSessionInit(cnxn, false);
}
//密码正确,再校验下session是否还有效,这里不同的server处理不一样
else {
revalidateSession(cnxn, sessionId, sessionTimeout);
}
}
  重试的server如果是leader

    @Override
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
int sessionTimeout) throws IOException {
//父类中通过sessionTrack检查
super.revalidateSession(cnxn, sessionId, sessionTimeout);
try {
// setowner as the leader itself, unless updated
// via the follower handlers
setOwner(sessionId, ServerCnxn.me);
} catch (SessionExpiredException e) {
// this is ok, it just means that the session revalidation failed.
}
}
}
  父类revalidateSession方法

    protected void revalidateSession(ServerCnxn cnxn, long sessionId,
int sessionTimeout) throws IOException {
boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(sessionId) +
" is valid: " + rc);
}
finishSessionInit(cnxn, rc);
}
  leader的SessionTracker为SessionTrackerImpl,touchSession方法如下

synchronized public boolean touchSession(long sessionId, int timeout) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.CLIENT_PING_TRACE_MASK,
"SessionTrackerImpl --- Touch session: 0x"
+ Long.toHexString(sessionId) + " with timeout " + timeout);
}
//因为session超时,session已经被删掉了,此处返回null,所以检查结果是false
SessionImpl s = sessionsById.get(sessionId);
// Return false, if the session doesn't exists or marked as closing
if (s == null || s.isClosing()) {
return false;
}
.....
  对于leader来说session检查的结果是false。
  如果是follower,其校验方法

protected void revalidateSession(ServerCnxn cnxn, long sessionId,
int sessionTimeout) throws IOException {
//需要询问leader,session是否还有效
getLearner().validateSession(cnxn, sessionId, sessionTimeout);
}
  follower询问session是否有效

     */
void validateSession(ServerCnxn cnxn, long clientId, int timeout)
throws IOException {
LOG.info("Revalidating client: 0x" + Long.toHexString(clientId));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeLong(clientId);
dos.writeInt(timeout);
dos.close();
//REVALIDATE包用来检查session是否还有效
QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos
.toByteArray(), null);
pendingRevalidations.put(clientId, cnxn);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.SESSION_TRACE_MASK,
"To validate session 0x"
+ Long.toHexString(clientId));
}
writePacket(qp, true);
}
  leader端处理REVALIDATE包

case Leader.REVALIDATE:
bis = new ByteArrayInputStream(qp.getData());
dis = new DataInputStream(bis);
long id = dis.readLong();
int to = dis.readInt();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
dos.writeLong(id);
//这里由于session已经被删掉,返回false
boolean valid = leader.zk.touch(id, to);
if (valid) {
try {
//set the session owner
// as the follower that
// owns the session
leader.zk.setOwner(id, this);
} catch (SessionExpiredException e) {
LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
}
}
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(id)
+ " is valid: "+ valid);
}
//结果是false
dos.writeBoolean(valid);
qp.setData(bos.toByteArray());
queuedPackets.add(qp);
break;
  follower处理返回结果

case Leader.REVALIDATE:
revalidate(qp);
break;
protected void revalidate(QuorumPacket qp) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(qp
.getData());
DataInputStream dis = new DataInputStream(bis);
long sessionId = dis.readLong();
boolean valid = dis.readBoolean();
ServerCnxn cnxn = pendingRevalidations
.remove(sessionId);
if (cnxn == null) {
LOG.warn("Missing session 0x"
+ Long.toHexString(sessionId)
+ " for validation");
} else {
zk.finishSessionInit(cnxn, valid);
}
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(sessionId)
+ " is valid: " + valid);
}
}
  可以看到无论是leader还是follower最后都会调用zk.finishSessionInit(cnxn, valid)处理,而由于session已经失效,所以valid为false

public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
......
try {
//由于valid是false,所以返回给client的sessionid为0,password为空
ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
: 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
// longer valid
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
bos.writeInt(-1, "len");
rsp.serialize(bos, "connect");
if (!cnxn.isOldClient) {
bos.writeBool(
this instanceof ReadOnlyZooKeeperServer, "readOnly");
}
baos.close();
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.remaining() - 4).rewind();
cnxn.sendBuffer(bb);   
......
}
  client端处理ConnectResponse

void readConnectResult() throws IOException {
......
//被server重置为0了
this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
conRsp.getPasswd(), isRO);
}

void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
//sessionid为0,抛出SessionExpiredException异常
if (negotiatedSessionTimeout <= 0) {
//state设为CLOSED,这个行为将关闭SendThread
state = States.CLOSED;
//Expired状态通知
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
//这个行为将关闭EventThread
eventThread.queueEventOfDeath();
//抛出异常
throw new SessionExpiredException(
"Unable to reconnect to ZooKeeper service, session 0x"
+ Long.toHexString(sessionId) + " has expired");
}
......
}
  SendThread处理SessionExpiredException,关闭SendThread

while (state.isAlive()) {
......
// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
LOG.info(e.getMessage() + ", closing socket connection");
}
......
//关闭连接,失败所有请求
cleanup();
//此时state已经被置为CLOSED,SendThread将退出
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
//清理
cleanup();
//关闭selector
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exitedloop.");
  清理动作

private void cleanup() {
//关闭socket
clientCnxnSocket.cleanup();
//发送完等待响应的请求失败,此时由于state是CLOSED,所以异常信息是SESSIONEXPIRED
synchronized (pendingQueue) {
for (Packet p : pendingQueue) {
conLossPacket(p);
}
pendingQueue.clear();
}
//等待发送的请求失败,此时由于state是CLOSED,所以异常信息是SESSIONEXPIRED
synchronized (outgoingQueue) {
for (Packet p : outgoingQueue) {
conLossPacket(p);
}
outgoingQueue.clear();
}
}

private void conLossPacket(Packet p) {
if (p.replyHeader == null) {
return;
}
switch (state) {
case AUTH_FAILED:
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
break;
//关闭的时候,是SESSIONEXPIRED异常码
case CLOSED:
p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
break;
//其他是CONNECTIONLOSS异常码
default:
p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
}
finishPacket(p);
}
  EventThread关闭

public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
//kill signal
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
//等所有通知都发完再退出
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down");
}
  之后client将不可用,所有请求都将发送的时候都将收到SESSIONEXPIRED异常码,因为queuePacket的时候一个判断

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
synchronized (outgoingQueue) {
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
//此时状态为CLOSED
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
  从以上分析可知,SESSIONEXPIRED异常码是比较严重的事件,之后这个zookeeper实例不可用了,如果需要恢复,则需要重新创建zookeeper实例。而CONNECTIONLOSS异常码是比较常见的,比如网络暂时中断的时候,这个状态码下zookeeper会自动重连恢复,因为server端还保留着session信息。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366234-1-1.html 上篇帖子: Zookeeper Leader选举工具类 下篇帖子: 3)用zookeeper实现分布式锁
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表