|
前面几篇zookeeper的文章简单分析了执行流程,接下来打算从横向来分析一下zk的一些特性,先从session开始。
这一篇http://iwinit.iteye.com/blog/1754611分析了单机情况下session建立,在集群环境下建立session不太一样,是一个proposal的过程,先假设集群由leader,followerA,followerB组成,我们的client去连followerA。follower和leader初始化之后,初始化的sessionTracker不一样,leader中是SessionTrackerImpl,follower中是LearnerSessionTracker,主要区别和类同点:
1.follower中不会启动超时检查线程,只是简单得记录了session信息,主要数据结构是
//session更新信息
HashMap<Long, Integer> touchTable = new HashMap<Long, Integer>();
//session超时信息
private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
而leader会启动超时线程,而且数据结构也多一些
//session实体
HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>();
//同一个超时时间点的session,给超时线程用
HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();
//session超时信息
ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
2.addSession时,leader会创建session实体,follower只是简单的记录了一下session信息
3.sessionId初始化算法一样
//1字节server_id+当前时间的后5个字节+2字节0,保证全局唯一
public static long initializeNextSession(long id) {
long nextSid = 0;
nextSid = (System.currentTimeMillis() << 24) >> 8;
nextSid = nextSid | (id <<56);
return nextSid;
}
连下来client开始创建session
1.客户端发送ConnectionRequest给followerA
2.followerA处理
session超时时间协商
//处理session时间,minSessionTimeout为ticktime2倍,maxSessionTimeout为ticktime20倍
int sessionTimeout = connReq.getTimeOut();
byte passwd[] = connReq.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);
// We don't want to receive any packets until we are sure that the
// session is setup
cnxn.disableRecv();
//客户端发送的sessionid,重试时不为0
long sessionId = connReq.getSessionId();
//客户端重试,则reopen,后文分析
if (sessionId != 0) {
long clientSessionId = connReq.getSessionId();
LOG.info("Client attempting to renew session 0x"
+ Long.toHexString(clientSessionId)
+ " at " + cnxn.getRemoteSocketAddress());
serverCnxnFactory.closeSession(sessionId);
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
}
//创建session
else {
LOG.info("Client attempting to establish new session at "
+ cnxn.getRemoteSocketAddress());
createSession(cnxn, passwd, sessionTimeout);
}
sessionid和密码初始化
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
//sessionid递增
long sessionId = sessionTracker.createSession(timeout);
//随机密码
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
//4个字节的超时时间
ByteBuffer to = ByteBuffer.allocate(4);
to.putInt(timeout);
cnxn.setSessionId(sessionId);
//异步提交执行链
submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
return sessionId;
}
提交请求
private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
int xid, ByteBuffer bb, List<Id> authInfo) {
//初始化时,xid为0,bb为4个字节的session超时时间
Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
submitRequest(si);
}
根据之前http://iwinit.iteye.com/blog/1777109分析的Processor链图,FollowerRequestProcessor执行。和create请求一样,createSession是事务请求需要投票,FollowerA发送投票packet给leader。
3.leader处理,learnerHandler中收到request请求,提交leader的processor链
PrepRequestProcessor处理
//头信息
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
zks.getTime(), type);
....
case OpCode.createSession:
request.request.rewind();
int to = request.request.getInt();
//txn信息,就是一个session超时
request.txn = new CreateSessionTxn(to);
request.request.rewind();
//leader中创建session
zks.sessionTracker.addSession(request.sessionId, to);
//owner属于followerA
zks.setOwner(request.sessionId, request.getOwner());
break;
之后ProposalRequestProcessor发起投票,并写入log
4.followerA和followerB处理投票
public void logRequest(TxnHeader hdr, Record txn) {
Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
hdr.getType(), null, null);
request.hdr = hdr;
request.txn = txn;
request.zxid = hdr.getZxid();
//添加到pending队列
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
/写入log,并发送ack包给leader
syncProcessor.processRequest(request);
}
5.leader收到投票,发起commit,然后自己commit
6.leader的FinalRequestProcessor处理,添加session,不需要返回client数据
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
ProcessTxnResult rc;
int opCode = hdr.getType();
long sessionId = hdr.getClientId();
//createSession不需要修改db状态,啥都不做
rc = getZKDatabase().processTxn(hdr, txn);
//又添加了一次session
if (opCode == OpCode.createSession) {
if (txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
sessionTracker.addSession(sessionId, cst
.getTimeOut());
} else {
LOG.warn("*****>>>>> Got "
+ txn.getClass() + " "
+ txn.toString());
}
} else if (opCode == OpCode.closeSession) {
sessionTracker.removeSession(sessionId);
}
return rc;
}
7.followerA和followerB处理commit,区别在于CommitRequestProcessor中,followerA中的Request会带上connection信息而followerB中的reqesut没有connection信息。所以在FinalRequestProcessor中,followerB创立完session就返回了,而followerA还需要写回client响应
最后看下leader的sessionTracker超时机制,构造SessionTrackerImpl
public SessionTrackerImpl(SessionExpirer expirer,
ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
long sid)
{
super("SessionTracker");
this.expirer = expirer;
this.expirationInterval = tickTime;
this.sessionsWithTimeout = sessionsWithTimeout;
//下一个检查点,向上取整为expirationInterval倍数,expirationInterval就是配置的ticktime
nextExpirationTime = roundToInterval(System.currentTimeMillis());
this.nextSessionId = initializeNextSession(sid);
for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
addSession(e.getKey(), e.getValue());
}
}
主线程循环
synchronized public void run() {
try {
while (running) {
//下一个超时点没到,就等待
currentTime = System.currentTimeMillis();
if (nextExpirationTime > currentTime) {
this.wait(nextExpirationTime - currentTime);
continue;
}
//同一个时间点超时的session
SessionSet set;
set = sessionSets.remove(nextExpirationTime);
//超时session处理
if (set != null) {
for (SessionImpl s : set.sessions) {
setSessionClosing(s.sessionId);
expirer.expire(s);
}
}
//下一个check point
nextExpirationTime += expirationInterval;
}
} catch (InterruptedException e) {
LOG.error("Unexpected interruption", e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
session更新时
synchronized public boolean touchSession(long sessionId, int timeout) {
......
//session对象
SessionImpl s = sessionsById.get(sessionId);
// Return false, if the session doesn't exists or marked as closing
if (s == null || s.isClosing()) {
return false;
}
//这个session的下一个超时点,向上取整为ticktime倍数
long expireTime = roundToInterval(System.currentTimeMillis() + timeout);
//时间点比老的时间还小,不更新
if (s.tickTime >= expireTime) {
// Nothing needs to be done
return true;
}
//先从老的超时set中remove掉,再添加到新的set中,超时线程会定时check
SessionSet set = sessionSets.get(s.tickTime);
if (set != null) {
set.sessions.remove(s);
}
//下个超时点
s.tickTime = expireTime;
//添加到新的超时set中
set = sessionSets.get(s.tickTime);
if (set == null) {
set = new SessionSet();
sessionSets.put(expireTime, set);
}
set.sessions.add(s);
return true;
}
简单小节
1.session创建需要投票处理
2.结果是每台server上的内存中都会建立相同的session记录
3.sessionid通过serverid+时间保证唯一
4.session超时检查由leader负责,以ticktime定时检查
5.session更新时,会修改自己这个session所属的超时set,超时时间是ticktime倍数 |
|