设为首页 收藏本站
查看: 1054|回复: 0

[经验分享] 深入浅出Zookeeper之三Exists请求和处理

[复制链接]

尚未签到

发表于 2017-4-19 12:51:10 | 显示全部楼层 |阅读模式
  前一篇介绍了zookeeper的client和server之间session是如何建立的。在DataMonitor的例子中,DataMonitor通过exists异步接口和server端交互,本文将介绍exists操作是如何完成。
  dataMonitor开始exist操作

    public void exists(final String path, Watcher watcher,
StatCallback cb, Object ctx)
{
......
//exist请求头
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.exists);
//请求体
ExistsRequest request = new ExistsRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
SetDataResponse response = new SetDataResponse();
//添加到发送队列
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
clientPath, serverPath, ctx, wcb);
}
  添加过程

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
synchronized (outgoingQueue) {
//转换成Packet
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
//添加到发送队列
outgoingQueue.add(packet);
}
}
//唤醒下Selector,快点处理
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
  接下来还是回到SendThread的发送过程,之前Session建立时已经分析过,这里有一点要注意下:

//重要的业务请求,需要设置事务id
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
  接下来Server端IO线程拿到请求,处理,过程和之前session建立时一样,就不赘述了。变化的是后续的处理链。

PrepRequestProcessor预处理

//All the rest don't need to create a Txn - just verify session
//读请求,不需要创建事务,只是检查了下session是否还在,此时事务头和事务体都是null
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
break;


 SyncRequestProcessor处理逻辑之前已经分析过了,这里就挑重点说一下

//试图将其写log,由于ExistsRequest并不是一个事务型请求,所以这里直接返回false,也就是说ExistsRequest不会被记录到log文件中
zks.getZKDatabase().append(si)
 接下来FinalRequestProcessor处理,由于不是事务型请求,省了很多步骤,直接进入switch处理:

case OpCode.exists: {
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
//反序列化
ByteBufferInputStream.byteBuffer2Record(request.request,
existsRequest);
String path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
//拿对应node的状态,并设置是否watch
Stat stat = zks.getZKDatabase().statNode(path, existsRequest
.getWatch() ? cnxn : null);
//结果
rsp = new ExistsResponse(stat);
break;
}
......
//当前处理zxid
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
//构造相应头
ReplyHeader hdr =
new ReplyHeader(request.cxid, lastZxid, err.intValue());
zks.serverStats().updateLatency(request.createTime);
cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
request.createTime, System.currentTimeMillis());
try {
//写回响应
cnxn.sendResponse(hdr, rsp, "response");
  statNode过程

//此处的watcher就是对应ServerCnxn,代表是哪个client,server端需要notify的时候,直接往对应ServerCnxn写数据即可
public Stat statNode(String path, Watcher watcher)
throws KeeperException.NoNodeException {
Stat stat = new Stat();
DataNode n = nodes.get(path);
if (watcher != null) {
//对于exists请求,需要监听data变化事件,添加watcher
dataWatches.addWatch(path, watcher);
}
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
return stat;
}
}
  好了,以上server端就完成了ExistsRequest的处理了。接下来client端SendThread收到ExistsResponse进行处理

        if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
......            
else if (!initialized) {
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
}
//此时client session已经建立,init完成
else {
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
  具体读取:

void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
//先读响应头,先对特殊的xid进行处理
replyHdr.deserialize(bbia, "header");
......
//从头拿之前入队的ExistsRequest请求Packet,由于client和server都是单线程处理,多队列处理,所以认为全局有序
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
packet = pendingQueue.remove();
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
//检查一下是否一样
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+ replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet );
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
//更新client端的最新zxid
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
//反序列化响应
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
//处理packet,主要是注册watcher,回调,触发事件
finishPacket(packet);
}
}
  finishPacket过程
  


    private void finishPacket(Packet p) {
//注册watcher,如果是exists请求,则注册到dataWatches中
if (p.watchRegistration != null) {
p.watchRegistration.register(p.replyHeader.getErr());
}
//如果是同步接口,则唤醒等待的业务线程
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
}
//如果是异步请求,则发送异步事件
else {
p.finished = true;
eventThread.queuePacket(p);
}
}
  EventThread端
  


else if (p.response instanceof ExistsResponse
|| p.response instanceof SetDataResponse
|| p.response instanceof SetACLResponse) {
StatCallback cb = (StatCallback) p.cb;
//如果成功,增加node stat的回调参数
if (rc == 0) {
if (p.response instanceof ExistsResponse) {
cb.processResult(rc, clientPath, p.ctx,
((ExistsResponse) p.response)
.getStat());
} else if (p.response instanceof SetDataResponse) {
cb.processResult(rc, clientPath, p.ctx,
((SetDataResponse) p.response)
.getStat());
} else if (p.response instanceof SetACLResponse) {
cb.processResult(rc, clientPath, p.ctx,
((SetACLResponse) p.response)
.getStat());
}
}
//如果响应失败,stat为null
else {
cb.processResult(rc, clientPath, p.ctx, null);
}
}
  在DataMonitor例子中,它本身就是一个StatCallback

public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists;
switch (rc) {
case Code.Ok:
exists = true;
break;
case Code.NoNode:
exists = false;
break;
case Code.SessionExpired:
case Code.NoAuth:
dead = true;
listener.closing(rc);
return;
default:
// Retry errors
zk.exists(znode, true, this, null);
return;
}
  Exists过程大致就是上面描述的,主要注意点:
  1.客户端Request发送完之后会进入Pending队列,等待响应之后拿出来继续处理
  2.同步接口是使用Packet.wait()实现的
  3.server端exists操作不是事务型的操作,不会写入log
  4.server端的watcher就是一个客户端连接ServerCxcn,代表一个客户端,notify的时候直接往连接里写数据即可

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366435-1-1.html 上篇帖子: ZooKeeper学习之文件系统的布局和格式 下篇帖子: 深入浅出Zookeeper之六 Leader/Follower初始化
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表