徐冬丽 发表于 2019-1-9 06:08:54

深入浅出Zookeeper之二Session建立

  
public>  

  
.......
  

  
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
  
DataMonitorListener listener) {
  
......
  
// Get things started by checking if the node exists. We are going
  
// to be completely event driven,异步exist,注册watcher,设置回调
  
zk.exists(znode, true, this, null);
  
}
  

  
......
  
//处理watcher通知事件
  
public void process(WatchedEvent event) {
  
String path = event.getPath();
  
//如果exist操作的对应的事件触发(create.delete,setdata),则再次注册watcher(watcher是单次的),业务处理都在回调里处理
  
} else {
  
if (path != null && path.equals(znode)) {
  
// Something has changed on the node, let's find out
  
zk.exists(znode, true, this, null);
  
}
  
}
  
if (chainedWatcher != null) {
  
chainedWatcher.process(event);
  
}
  
}
  
//处理exist操作的回掉结果
  
public void proce***esult(int rc, String path, Object ctx, Stat stat) {
  
boolean exists;
  
switch (rc) {
  
case Code.Ok:
  
exists = true;
  
break;
  
case Code.NoNode:
  
exists = false;
  
break;
  
case Code.SessionExpired:
  
case Code.NoAuth:
  
dead = true;
  
listener.closing(rc);
  
return;
  
default:
  
// Retry errors
  
zk.exists(znode, true, this, null);
  
return;
  
}
  
//如果节点存在,则同步获取节点数据
  
byte b[] = null;
  
if (exists) {
  
try {
  
b = zk.getData(znode, false, null);
  
} catch (KeeperException e) {
  
// We don't need to worry about recovering now. The watch
  
// callbacks will kick off any exception handling
  
e.printStackTrace();
  
} catch (InterruptedException e) {
  
return;
  
}
  
}
  
//如果数据有变化,则处理之
  
if ((b == null && b != prevData)
  
|| (b != null && !Arrays.equals(prevData, b))) {
  
listener.exists(b);
  
prevData = b;
  
}
  
}
  
}


页: [1]
查看完整版本: 深入浅出Zookeeper之二Session建立