设为首页 收藏本站
查看: 592|回复: 0

[经验分享] 深入浅出Zookeeper之二Session建立

[复制链接]
发表于 2017-4-19 11:37:43 | 显示全部楼层 |阅读模式
  上一篇,小编给大家介绍了zookeeper server端的启动。这一篇将来说一下client和server端是如何建立session的。通过官网的DataMonitor例子来说明。通过Session建立这个例子,可以大概知道client端和server端是如何处理请求的,之间是如何通信的。
  官网Datamonitor的代码:
  Executor

public class Executor implements Watcher, Runnable,
DataMonitor.DataMonitorListener {
String znode;
DataMonitor dm;
ZooKeeper zk;
String filename;
String exec[];
Process child;
//Executor是一个watcher,不过其处理都代理给DataMonitor了
public Executor(String hostPort, String znode, String filename,
String exec[]) throws KeeperException, IOException {
this.filename = filename;
this.exec = exec;
//初始化zookeeper的client,这一步会建立连接,创建session,启动client端的SendThread线程,当然都是异步的
zk = new ZooKeeper(hostPort, 3000, this);
//datamonitor是真实的处理类
dm = new DataMonitor(zk, znode, null, this);
}
  DataMonitor

public class DataMonitor implements Watcher, StatCallback {
.......
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
DataMonitorListener listener) {
......
// Get things started by checking if the node exists. We are going
// to be completely event driven,异步exist,注册watcher,设置回调
zk.exists(znode, true, this, null);
}
......
//处理watcher通知事件
public void process(WatchedEvent event) {
String path = event.getPath();
//如果exist操作的对应的事件触发(create.delete,setdata),则再次注册watcher(watcher是单次的),业务处理都在回调里处理
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let's find out
zk.exists(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}
//处理exist操作的回掉结果
public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists;
switch (rc) {
case Code.Ok:
exists = true;
break;
case Code.NoNode:
exists = false;
break;
case Code.SessionExpired:
case Code.NoAuth:
dead = true;
listener.closing(rc);
return;
default:
// Retry errors
zk.exists(znode, true, this, null);
return;
}
//如果节点存在,则同步获取节点数据
byte b[] = null;
if (exists) {
try {
b = zk.getData(znode, false, null);
} catch (KeeperException e) {
// We don't need to worry about recovering now. The watch
// callbacks will kick off any exception handling
e.printStackTrace();
} catch (InterruptedException e) {
return;
}
}
//如果数据有变化,则处理之
if ((b == null && b != prevData)
|| (b != null && !Arrays.equals(prevData, b))) {
listener.exists(b);
prevData = b;
}
}
}
  从这个例子出发,我们来分析下zookeeper的第一步session是如何建立的,主要就是Zookeeper类的构造。
  Zookeeper构造

    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
{
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
//设置默认watcher
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
//从配置的serverList,解析成serverAddresses,这里做了shuffle,server顺序被打乱了
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
//创建客户端连接,初始化SendThread和EventThread
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
//启动SendThread和EventThread
cnxn.start();
}
  初始化连接

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
//客户端sessionId
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
//客户端设置的超时时间
this.sessionTimeout = sessionTimeout;
//主机列表
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
//连接超时
connectTimeout = sessionTimeout / hostProvider.size();
//读超时
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
//初始化client2个核心线程,SendThread是client的IO核心线程,EventThread从SendThread里拿到event,调用对应watcher
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
}

  SendThread核心流程

public void run() {
.....
while (state.isAlive()) {
try {
//如果还没连上,则启动连接过程,这个方法有歧义,其实现是判断sockkey是否已注册,可能此时连接上server
if (!clientCnxnSocket.isConnected()) {
......
//异步连接
startConnect();
clientCnxnSocket.updateLastSendAndHeard();
}
//如果状态为连接上,则真的是连上server了
if (state.isConnected()) {
......
//下一次select超时时间
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
//如果没连上,则递减连接超时
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
//session超时,包括连接超时
if (to <= 0) {
throw new SessionTimeoutException(
"Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv() + "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId));
}
//如果send空闲,则发送心跳包
if (state.isConnected()) {
int timeToNextPing = readTimeout / 2
- clientCnxnSocket.getIdleSend();
if (timeToNextPing <= 0) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
// If we are in read-only mode, seek for read/write server
//如果是只读模式,则寻找R/W server,如果找到,则清理之前的连接,并重新连接到R/W server
if (state == States.CONNECTEDREADONLY) {
long now = System.currentTimeMillis();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
//同步测试下个server是否是R/W server,如果是则抛出RWServerFoundException
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
//处理IO
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
if (LOG.isDebugEnabled()) {
// closing so this is expected
LOG.debug("An exception was thrown while closing send thread for session 0x"
+ Long.toHexString(getSessionId())
+ " : " + e.getMessage());
}
break;
} else {
// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
LOG.info(e.getMessage() + ", closing socket connection");
} else if (e instanceof SessionTimeoutException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof EndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof RWServerFoundException) {
LOG.info(e.getMessage());
} else {
......
}
//清理之前的连接,找下一台server连接
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
}
}
......
}
  
 具体过程

private void startConnect() throws IOException {
//状态改为CONNETING
state = States.CONNECTING;
//拿目标地址
InetSocketAddress addr;
if (rwServerAddress != null) {
addr = rwServerAddress;
rwServerAddress = null;
} else {
addr = hostProvider.next(1000);
}
setName(getName().replaceAll("\\(.*\\)",
"(" + addr.getHostName() + ":" + addr.getPort() + ")"));
......
//异步连接
clientCnxnSocket.connect(addr);
}
  具体connect

    void connect(InetSocketAddress addr) throws IOException {
//创建客户端SocketChannel
SocketChannel sock = createSock();
try {
//注册OP_CONNECT事件,尝试连接
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to " + addr);
sock.close();
throw e;
}
//session还未初始化
initialized = false;
/*
* Reset incomingBuffer
*/
//重置2个读buffer,准备下一次读
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
  registerAndConnect过程:

    void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
//尝试连接
boolean immediateConnect = sock.connect(addr);
//如果网络情况很好,立马可以连上,则发送ConnectRequest请求,请求和server建立session
if (immediateConnect) {
sendThread.primeConnection();
}
}
  primeConnection代表连上之后的操作,主要是建立session:

void primeConnection() throws IOException {
......
//客户端sessionId默认为0
long sessId = (seenRwServerBefore) ? sessionId : 0;
//构造连接请求
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
synchronized (outgoingQueue) {
......
//组合成通讯层的Packet对象,添加到发送队列,对于ConnectRequest其requestHeader为null
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));
}
//确保读写事件都监听
clientCnxnSocket.enableReadWriteOnly();
.....
}
  此时ConnectRequest请求已经添加到发送队列,SendThread进入doTransport处理流程:

void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
//select
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
//如果之前连接没有立马连上,则在这里处理OP_CONNECT事件
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
}
//如果读写就位,则处理之
else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
}
}
selected.clear();
}
  假设我们此时连接已经好了,WRITE事件ok,则SendThread开始发送我们的ConnectRequest

if (sockKey.isWritable()) {
//同步处理
synchronized(outgoingQueue) {
//从发送队列中拿请求
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
//修改上次发送时间
updateLastSend();
// If we already started writing p, p.bb will already exist
//序列化Packet到ByteBuffer
if (p.bb == null) {
//如果是业务请求,则需要设置事务Id
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
//序列化
p.createBB();
}
//写数据
sock.write(p.bb);
//写完了,太好了,发送成功
if (!p.bb.hasRemaining()) {
//已发送的业务Packet数量
sentCount++;
//发送完了,那从发送队列删掉,方便后续发送请求处理
outgoingQueue.removeFirstOccurrence(p);
//如果是业务请求,则添加到Pending队列,方便对server端返回做相应处理,如果是其他请求,发完就扔了。。。
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
//请求发完了,不需要再监听OS的写事件了,如果没发完,那还是要继续监听的,继续写嘛
if (outgoingQueue.isEmpty()) {
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}
  具体序列化方式,ConnRequest的packet没有协议头

public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
//写一个int,站位用,整个packet写完会来更新这个值,代表packet的从长度,4个字节
boa.writeInt(-1, "len"); // We'll fill this in later
//序列化协议头
if (requestHeader != null) {
requestHeader.serialize(boa, "header");
}
//序列化协议体
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
baos.close();
//生成ByteBuffer
this.bb = ByteBuffer.wrap(baos.toByteArray());
//将bytebuffer的前4个字节修改成真正的长度,总长度减去一个int的长度头
this.bb.putInt(this.bb.capacity() - 4);
//准备给后续读
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
  这里我们的第一个Packet是ConnReq,它构造的packet没有header,所以发完就直接丢掉了,但是SendThread还需要监听server端的返回,以确认连上,并进行session的初始化。那到这里client端等待server端返回了,我们看看server是怎么处理ConnReq请求的。
  假设server的selector线程已经就位,则selector会拿到一个读就位的事件,也就是client的connReq请求

else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
  if (k.isReadable()) {

//先从Channel读4个字节,代表头
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
//int读好,继续往下读
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
//2个一样,就可以继续读下一个请求了
if (incomingBuffer == lenBuffer) { // start of next request
incomingBuffer.flip();
//给incomingBuffer分配一个length长度的内存,将后续的数据都给读进来
isPayload = readLength(k);
//clear一下,准备写
incomingBuffer.clear();
} else {
// continuation
isPayload = true;
}
//好,读后续数据
if (isPayload) { // not the case for 4letterword
readPayload();
}
else {
// four letter words take care
// need not do anything else
return;
}
}
}
  具体的后续数据流程:

/** Read the request payload (everything following the length prefix) */
private void readPayload() throws IOException, InterruptedException {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
//尝试一次读进来
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
}
//哈哈,一次读完
if (incomingBuffer.remaining() == 0) { // have we read length bytes?
//server的packet统计
packetReceived();
//准备使用这个buffer了
incomingBuffer.flip();
//嘿嘿,如果CoonectRequst还没来,那第一个packet肯定是他了
if (!initialized) {
readConnectRequest();
}
//处理请他请求
else {
readRequest();
}
//清理现场,为下一个packet读做准备
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}
  我们现在发的ConnReq已经被server端接受了,处理如下

private void readConnectRequest() throws IOException, InterruptedException {
if (zkServer == null) {
throw new IOException("ZooKeeperServer not running");
}
//开始执行ConnectRequest的处理链
zkServer.processConnectRequest(this, incomingBuffer);
//处理完了,说明业务连接已经建立好了
initialized = true;
}
  具体处理:

public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
//ConnectReq的packet是没有header的,所以直接读内容,反序列化
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest connReq = new ConnectRequest();
connReq.deserialize(bia, "connect");
...
boolean readOnly = false;
try {
//是否readOnly
readOnly = bia.readBool("readOnly");
cnxn.isOldClient = false;
} catch (IOException e) {
....
}
...
//设置客户端请求的session相关参数
int sessionTimeout = connReq.getTimeOut();
byte passwd[] = connReq.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);
// We don't want to receive any packets until we are sure that the
// session is setup
//暂时先不读后续请求了,直到session建立
cnxn.disableRecv();
//拿客户端的sessionId
long sessionId = connReq.getSessionId();
//重试
if (sessionId != 0) {
long clientSessionId = connReq.getSessionId();
LOG.info("Client attempting to renew session 0x"
+ Long.toHexString(clientSessionId)
+ " at " + cnxn.getRemoteSocketAddress());
serverCnxnFactory.closeSession(sessionId);
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
} else {
LOG.info("Client attempting to establish new session at "
+ cnxn.getRemoteSocketAddress());
//创建新Session
createSession(cnxn, passwd, sessionTimeout);
}
}
  创建新session如下:

    long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
//server端创建session,sessionId自增
long sessionId = sessionTracker.createSession(timeout);
//随机密码
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
ByteBuffer to = ByteBuffer.allocate(4);
to.putInt(timeout);
//每个server端连接都有一个唯一的SessionId
cnxn.setSessionId(sessionId);
//提交请求给后面的执行链
submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
return sessionId;
}
  提交过程:

private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
int xid, ByteBuffer bb, List<Id> authInfo) {
Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
submitRequest(si);
}
  Server端开始执行链,参数是内部的Request对象,此时type是CREATE_SESSION:

public void submitRequest(Request si) {
......
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
//提交给后续的processor执行,一般用异步以提升性能
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
......
}
  第一个processor PrepRequestProcessor执行:

public void run() {
try {
while (true) {
Request request = submittedRequests.take();
......
pRequest(request);
}
......
}
  对于CREATE_SESSION具体处理:

//create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
//在这里,组装了Request的header和txh实现,方便后续processor处理
pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
break;
......
request.zxid = zks.getZxid();
//让后续processor处理,这里一般是异步以提高性能
nextProcessor.processRequest(request);
case OpCode.createSession:
//读session超时值
request.request.rewind();
int to = request.request.getInt();
//组装具体的Record实现,这里是CreateSessionTxn,方便后续processor处理
request.txn = new CreateSessionTxn(to);
request.request.rewind();
zks.sessionTracker.addSession(request.sessionId, to);
zks.setOwner(request.sessionId, request.getOwner());
break;
  从上可见,PrepRequestProcessor主要是负责组装Request的header和txn参数的,相当于是预处理
  第二个Processor SyncRequestProcessor处理:

int randRoll = r.nextInt(snapCount/2);
while (true) {
Request si = null;
//flush队列如果为空,阻塞等待,代表之前的请求都被处理了
if (toFlush.isEmpty()) {
si = queuedRequests.take();
}
//如果不为空,就是说还有请求等待处理,先非阻塞拿一下,如果系统压力小,正好没有请求进来,则处理之前积压的请求
//如果系统压力大,则可能需要flush满1000个才会继续处理
else {
si = queuedRequests.poll();
//任务queue空闲,处理积压的待flush请求
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// track the number of records written to the log
//将Request append到log输出流,先序列化再append,注意此时request还没flush到磁盘,还在内存呢
if (zks.getZKDatabase().append(si)) {
//成功计数器
logCount++;
//如果成功append的request累计数量大于某个值,则执行flush log的操作
//并启动一个线程异步将内存里的Database和session状态写入到snapshot文件,相当于一个checkpoint
//snapCount默认是100000
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
// roll the log
//将内存中的log flush到磁盘
zks.getZKDatabase().rollLog();
// take a snapshot
//启动线程异步将内存中的database和sessions状态写入snapshot文件中
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new Thread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
}
//如果是写请求,而且flush队列为空,执行往下执行
else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
continue;
}
//写请求前面append到log输出流后,在这里加入到flush队列,后续批量处理
toFlush.add(si);
//如果系统压力大,可能需要到1000个request才会flush,flush之后可以被后续processor处理
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
  具体的flush处理:

private void flush(LinkedList<Request> toFlush)
throws IOException, RequestProcessorException
{
if (toFlush.isEmpty())
return;
//将之前的append log flush到磁盘,并顺便关闭旧的log文件句柄
zks.getZKDatabase().commit();
//log flush完后,开始处理flush队列里的Request
while (!toFlush.isEmpty()) {
Request i = toFlush.remove();
//执行后面的processor
nextProcessor.processRequest(i);
}
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
  我们假设现在系统压力小,我们的ConnectionRequest可以被立马处理了,执行FinalRequestProcessor:

if (request.hdr != null) {
TxnHeader hdr = request.hdr;
Record txn = request.txn;
//对于事务型请求,处理之
rc = zks.processTxn(hdr, txn);
}
  具体处理:

public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
ProcessTxnResult rc;
int opCode = hdr.getType();
long sessionId = hdr.getClientId();
//进一步调用database来处理事务
rc = getZKDatabase().processTxn(hdr, txn);
//如果是创建session,添加session
if (opCode == OpCode.createSession) {
if (txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
sessionTracker.addSession(sessionId, cst
.getTimeOut());
......
return rc;
}
  
 public ProcessTxnResult processTxn(TxnHeader header, Record txn)


    {
//在这里构造一个Result对象,返回给FinalRequestProcessor
ProcessTxnResult rc = new ProcessTxnResult();
try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
......
  在FinalRequestProcessor拿到database的处理结果,继续处理:

case OpCode.createSession: {
zks.serverStats().updateLatency(request.createTime);
lastOp = "SESS";
cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
request.createTime, System.currentTimeMillis());
//在这里写回response
zks.finishSessionInit(request.cnxn, true);
return;
}
  
 public void finishSessionInit(ServerCnxn cnxn, boolean valid) {


        ......
//构造一个返回对象,返回协商的sessionTimeout,唯一的sessionId和client的密码
ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
: 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
// longer valid
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
//用-1占位
bos.writeInt(-1, "len");
//序列化内容
rsp.serialize(bos, "connect");
if (!cnxn.isOldClient) {
bos.writeBool(
this instanceof ReadOnlyZooKeeperServer, "readOnly");
}
baos.close();
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
//将之前的-1改成真实的长度
bb.putInt(bb.remaining() - 4).rewind();
//通过channel写回
cnxn.sendBuffer(bb);   
......
//打开selector的读事件
cnxn.enableRecv();
......
}
  具体写回,通讯层NIOServerCnxn:

public void sendBuffer(ByteBuffer bb) {
try {
if (bb != ServerCnxnFactory.closeConn) {
// We check if write interest here because if it is NOT set,
// nothing is queued, so we can try to send the buffer right
// away without waking up the selector
//确保可写
if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {
try {
//写回client
sock.write(bb);
} catch (IOException e) {
// we are just doing best effort right now
}
}
// if there is nothing left to send, we are done
//一次写完了,太好了
if (bb.remaining() == 0) {
packetSent();
return;
}
}
//如果一次没写完,添加到输出队列,后续继续写
synchronized(this.factory){
sk.selector().wakeup();
if (LOG.isTraceEnabled()) {
LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
+ " is valid: " + sk.isValid());
}
outgoingBuffers.add(bb);
if (sk.isValid()) {
sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
}
}
.......
}
  到这里server端已经执行完毕了,返回给client一个ConnectResponse对象,client端的SendThread收到server端的Response处理:

void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
if (sockKey.isReadable()) {
//先读包的长度,一个int
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
//如果读满,注意这里同一个包,要读2次,第一次读长度,第二次读内容,incomingBuffer重用
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
//如果读的是长度
if (incomingBuffer == lenBuffer) {
recvCount++;
//给incomingBuffer分配包长度的空间
readLength();
}
//如果还未初始化,就是session还没建立,那server端返回的必须是ConnectResponse
else if (!initialized) {
//读取ConnectRequest,其实就是将incomingBuffer的内容反序列化成ConnectResponse对象
readConnectResult();
//继续读后续响应
enableRead();
//如果还有写请求,确保write事件ok
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
//准备读下一个响应
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
//session建立完毕
initialized = true;
} else {
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
  具体的读取:

void readConnectResult() throws IOException {
.....
//将incomingBuffer反序列化成CoonectResponse
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");
} catch (IOException e) {
// this is ok -- just a packet from an old server which
// doesn't contain readOnly field
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
//server返回的sessionId
this.sessionId = conRsp.getSessionId();
//后续处理,初始化client的一些参数,最后触发WatchedEvent
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
conRsp.getPasswd(), isRO);
}
  后续处理如下:

void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
......
//初始化client端的session相关参数
readTimeout = negotiatedSessionTimeout * 2 / 3;
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
hostProvider.onConnected();
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
//修改CONNECT状态
state = (isRO) ?
States.CONNECTEDREADONLY : States.CONNECTED;
seenRwServerBefore |= !isRO;
LOG.info("Session establishment complete on server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", sessionid = 0x" + Long.toHexString(sessionId)
+ ", negotiated timeout = " + negotiatedSessionTimeout
+ (isRO ? " (READ-ONLY mode)" : ""));
//触发一个SyncConnected事件,这里有专门的EventThread会异步通知注册的watcher来处理
KeeperState eventState = (isRO) ?
KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
eventState, null));
}
  EventThread处理:

       public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
//EventThread同步session状态
sessionState = event.getState();
// materialize the watchers based on the event
//找出那些需要被通知的watcher,主线程直接调用对应watcher接口即可
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// queue the pair (watch set & event) for later processing
//提交异步队列处理
waitingEvents.add(pair);
}
  EventThread主线程

public void run() {
try {
isRunning = true;
while (true) {
//拿事件
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
//处理
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down");
}
  具体处理:

if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
}
  在我们的例子里,会调用Executor这个watcher的process方法,又代理给了DataMonitor,对于SyncConnected啥事不干

case SyncConnected:
// In this particular example we don't need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
break;
   好了,到这里client和server端session已经建立,可以进行后续的业务处理了。通过这个例子,我们讲解了client和server是如何交互数据,后续的请求比如create,get,set,delete都是类似流程。
  Session建立核心流程:
  1.创建TCP连接
  2.client发送ConnectRequest包
  3.server收到ConnectRequest包,创建session,将server端的sessionId返回给client
  4.client收到server的响应,触发相应SyncConnected状态的事件
  5.client端watcher消费事件

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366356-1-1.html 上篇帖子: (转)Zookeeper的安装配置以及Java的API 下篇帖子: zookeeper+dubbo+dubbo管理集群的简要配置[单机]
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表