设为首页 收藏本站
查看: 1003|回复: 0

[经验分享] [ZooKeeper]分布式Session创建

[复制链接]

尚未签到

发表于 2017-4-19 10:40:19 | 显示全部楼层 |阅读模式
  前面几篇zookeeper的文章简单分析了执行流程,接下来打算从横向来分析一下zk的一些特性,先从session开始。
这一篇http://iwinit.iteye.com/blog/1754611分析了单机情况下session建立,在集群环境下建立session不太一样,是一个proposal的过程,先假设集群由leader,followerA,followerB组成,我们的client去连followerA。follower和leader初始化之后,初始化的sessionTracker不一样,leader中是SessionTrackerImpl,follower中是LearnerSessionTracker,主要区别和类同点:
1.follower中不会启动超时检查线程,只是简单得记录了session信息,主要数据结构是

//session更新信息
HashMap<Long, Integer> touchTable = new HashMap<Long, Integer>();
//session超时信息
private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
  而leader会启动超时线程,而且数据结构也多一些

//session实体
HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>();
//同一个超时时间点的session,给超时线程用
HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();
//session超时信息
ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
  2.addSession时,leader会创建session实体,follower只是简单的记录了一下session信息
  3.sessionId初始化算法一样

//1字节server_id+当前时间的后5个字节+2字节0,保证全局唯一
public static long initializeNextSession(long id) {
long nextSid = 0;
nextSid = (System.currentTimeMillis() << 24) >> 8;
nextSid =  nextSid | (id <<56);
return nextSid;
}
  连下来client开始创建session
  1.客户端发送ConnectionRequest给followerA
2.followerA处理
  session超时时间协商

//处理session时间,minSessionTimeout为ticktime2倍,maxSessionTimeout为ticktime20倍
int sessionTimeout = connReq.getTimeOut();
byte passwd[] = connReq.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);
// We don't want to receive any packets until we are sure that the
// session is setup
cnxn.disableRecv();
//客户端发送的sessionid,重试时不为0
long sessionId = connReq.getSessionId();
//客户端重试,则reopen,后文分析
if (sessionId != 0) {
long clientSessionId = connReq.getSessionId();
LOG.info("Client attempting to renew session 0x"
+ Long.toHexString(clientSessionId)
+ " at " + cnxn.getRemoteSocketAddress());
serverCnxnFactory.closeSession(sessionId);
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
}
//创建session
else {
LOG.info("Client attempting to establish new session at "
+ cnxn.getRemoteSocketAddress());
createSession(cnxn, passwd, sessionTimeout);
}
  sessionid和密码初始化

long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
//sessionid递增
long sessionId = sessionTracker.createSession(timeout);
//随机密码
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
//4个字节的超时时间
ByteBuffer to = ByteBuffer.allocate(4);
to.putInt(timeout);
cnxn.setSessionId(sessionId);
//异步提交执行链
submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
return sessionId;
}
  提交请求

   private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
int xid, ByteBuffer bb, List<Id> authInfo) {
//初始化时,xid为0,bb为4个字节的session超时时间
Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
submitRequest(si);
}
  根据之前http://iwinit.iteye.com/blog/1777109分析的Processor链图,FollowerRequestProcessor执行。和create请求一样,createSession是事务请求需要投票,FollowerA发送投票packet给leader。
  3.leader处理,learnerHandler中收到request请求,提交leader的processor链
  PrepRequestProcessor处理

//头信息
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
zks.getTime(), type);
....
case OpCode.createSession:
request.request.rewind();
int to = request.request.getInt();
//txn信息,就是一个session超时
request.txn = new CreateSessionTxn(to);
request.request.rewind();
//leader中创建session
zks.sessionTracker.addSession(request.sessionId, to);
//owner属于followerA
zks.setOwner(request.sessionId, request.getOwner());
break;
  之后ProposalRequestProcessor发起投票,并写入log
  4.followerA和followerB处理投票

public void logRequest(TxnHeader hdr, Record txn) {
Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
hdr.getType(), null, null);
request.hdr = hdr;
request.txn = txn;
request.zxid = hdr.getZxid();
//添加到pending队列
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
/写入log,并发送ack包给leader
syncProcessor.processRequest(request);
}
  5.leader收到投票,发起commit,然后自己commit
  6.leader的FinalRequestProcessor处理,添加session,不需要返回client数据

public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
ProcessTxnResult rc;
int opCode = hdr.getType();
long sessionId = hdr.getClientId();
//createSession不需要修改db状态,啥都不做
rc = getZKDatabase().processTxn(hdr, txn);
//又添加了一次session
if (opCode == OpCode.createSession) {
if (txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
sessionTracker.addSession(sessionId, cst
.getTimeOut());
} else {
LOG.warn("*****>>>>> Got "
+ txn.getClass() + " "
+ txn.toString());
}
} else if (opCode == OpCode.closeSession) {
sessionTracker.removeSession(sessionId);
}
return rc;
}
  7.followerA和followerB处理commit,区别在于CommitRequestProcessor中,followerA中的Request会带上connection信息而followerB中的reqesut没有connection信息。所以在FinalRequestProcessor中,followerB创立完session就返回了,而followerA还需要写回client响应
  最后看下leader的sessionTracker超时机制,构造SessionTrackerImpl

public SessionTrackerImpl(SessionExpirer expirer,
ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
long sid)
{
super("SessionTracker");
this.expirer = expirer;
this.expirationInterval = tickTime;
this.sessionsWithTimeout = sessionsWithTimeout;
//下一个检查点,向上取整为expirationInterval倍数,expirationInterval就是配置的ticktime
nextExpirationTime = roundToInterval(System.currentTimeMillis());
this.nextSessionId = initializeNextSession(sid);
for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
addSession(e.getKey(), e.getValue());
}
}
  主线程循环

    synchronized public void run() {
try {
while (running) {
//下一个超时点没到,就等待
currentTime = System.currentTimeMillis();
if (nextExpirationTime > currentTime) {
this.wait(nextExpirationTime - currentTime);
continue;
}
//同一个时间点超时的session
SessionSet set;
set = sessionSets.remove(nextExpirationTime);
//超时session处理
if (set != null) {
for (SessionImpl s : set.sessions) {
setSessionClosing(s.sessionId);
expirer.expire(s);
}
}
//下一个check point
nextExpirationTime += expirationInterval;
}
} catch (InterruptedException e) {
LOG.error("Unexpected interruption", e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
  session更新时

    synchronized public boolean touchSession(long sessionId, int timeout) {
......
//session对象
SessionImpl s = sessionsById.get(sessionId);
// Return false, if the session doesn't exists or marked as closing
if (s == null || s.isClosing()) {
return false;
}
//这个session的下一个超时点,向上取整为ticktime倍数
long expireTime = roundToInterval(System.currentTimeMillis() + timeout);
//时间点比老的时间还小,不更新
if (s.tickTime >= expireTime) {
// Nothing needs to be done
return true;
}
//先从老的超时set中remove掉,再添加到新的set中,超时线程会定时check
SessionSet set = sessionSets.get(s.tickTime);
if (set != null) {
set.sessions.remove(s);
}
//下个超时点
s.tickTime = expireTime;
//添加到新的超时set中
set = sessionSets.get(s.tickTime);
if (set == null) {
set = new SessionSet();
sessionSets.put(expireTime, set);
}
set.sessions.add(s);
return true;
}
  简单小节
1.session创建需要投票处理
2.结果是每台server上的内存中都会建立相同的session记录
3.sessionid通过serverid+时间保证唯一
4.session超时检查由leader负责,以ticktime定时检查
5.session更新时,会修改自己这个session所属的超时set,超时时间是ticktime倍数

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366274-1-1.html 上篇帖子: Zookeeper可以干什么,能带来什么 下篇帖子: zookeeper实战:ConfigServer代码样例
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表