设为首页 收藏本站
查看: 1039|回复: 0

[经验分享] Zookeeper故障总结

[复制链接]

尚未签到

发表于 2017-4-19 06:00:52 | 显示全部楼层 |阅读模式
  背景:
  10月29号,系统切换的时候,发生了master信息的变更,有个监听master信息的系统未收到zookeeper的通知。
  分析:
  原代码如下所示, 监听系统中用以下代码来监听path中数据的变更。当数据发生变更的时候,会回调process方法,然后处理相应的业务。(我们使用的是curator的jar包)

client = CuratorFrameworkFactory.newClient("*:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
try {
client.getData().usingWatcher(this).inBackground().forPath("/id/master");
} catch (Exception e) {
throw new RuntimeException(e);
}
  问题点: 
  以上代码再第一次发生变更的时候,可以接收到zookeeper的变更通知,但是在zookeeper server会将watcher删除,在以后的变更中将不会继续通知。
  源码分析:
  当调用forPath函数时将会调用GetDataBuilderImpl的forPath方法,这个方法中会异步处理操作

    @Override
public byte[] forPath(String path) throws Exception
{
path = client.fixForNamespace(path);
byte[]      responseData = null;
if ( backgrounding.inBackground() )
{
client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
}
else
{
responseData = pathInForeground(path);
}
return responseData;
}
  异步调用GetDataBuilderImpl的performBackgroundOperation方法。然后会调用appcahe原生zookeeper的jar包。

    @Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
if ( watching.isWatched() )
{
client.getZooKeeper().getData(operationAndData.getData(), true, callback, backgrounding.getContext());
}
else
{
client.getZooKeeper().getData(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext());
}
}

  然后会调用zookeeper.java中的getData函数,发送请求到zookeeper server. setWatch为true

public void getData(final String path, Watcher watcher,
DataCallback cb, Object ctx)
{
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
clientPath, serverPath, ctx, wcb);
}
  zookeeper server接收到客户端的getData请求时,会调用FinalRequestProcessor类中的以下代码处理

case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);
Stat stat = new Stat();
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
}
  接下去会调用DataTree中的getData函数,这个函数中会将监听的节点加入到WatcherManager中,这样我们就添加了对某个路径的监听

public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
dataWatches.addWatch(path, watcher);
}
return n.data;
}
}

  当另一个客户端调用setData方法时,Zookeeper客户端将会调用DataTree类中的setData方法。这个方法将会变更数据,然后调用triggerWatch方法,通知变更。

public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
DataNode n = nodes.get(path);
byte lastdata[] = null;
synchronized (n) {
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
// now update if the path is in a quota subtree.
String lastPrefix;
if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
- (lastdata == null ? 0 : lastdata.length));
}
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
  然后调用WatcherManager中的triggerWatch方法,方法中将会删除监听。然后调用watcher的process方法

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {                 
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
return watchers;
}
  调用NIOServerCnxn类中的process方法,发送通知到客户端。

synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
sendResponse(h, e, "notification");
}
  客户端接收到notification时,会调用ClientCnxn类的下面代码处理。queueEvent方法中会调用watcher.materialize方法确定watcherEvent的处理类

            if (replyHdr.getXid() == -1) {                 
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
WatchedEvent we = new WatchedEvent(event);
eventThread.queueEvent( we );
return;
}
public void queueEvent(WatchedEvent event) {
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
  调用zookeeper的materialize方法获取watcher对象,并且重watcher列表中删除。

       @Override
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
{
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
case None:
result.add(defaultWatcher);
boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
state != Watcher.Event.KeeperState.SyncConnected;
synchronized(dataWatches) {
for(Set<Watcher> ws: dataWatches.values()) {
result.addAll(ws);
}
if (clear) {
dataWatches.clear();
}
}
synchronized(existWatches) {
for(Set<Watcher> ws: existWatches.values()) {
result.addAll(ws);
}
if (clear) {
existWatches.clear();
}
}
synchronized(childWatches) {
for(Set<Watcher> ws: childWatches.values()) {
result.addAll(ws);
}
if (clear) {
childWatches.clear();
}
}
return result;
}
  总结:
  从上面源码,我们可以看出,zookeeper的每次只能监听一次变更。
  解决: 
  curator为我们封装了重复监听的类NodeCache.java. 它是怎么实现的呢?

NodeCache nc = new NodeCache(zkclient.getClient(), path);
nc.start(true);
  start方法会获取初始值,然后调用reset方法

public void     start(boolean buildInitial) throws Exception
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
ensurePath.ensure(client.getZookeeperClient());
client.getConnectionStateListenable().addListener(connectionStateListener);
if ( buildInitial )
{
internalRebuild();
}
reset();
}
  reset方法会调用zookeeper的forpath方法,并用包装的watcher类

private void     reset() throws Exception
{
if ( (state.get() == State.STARTED) && isConnected.get() )
{
client.checkExists().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
}
}
  当配置发生变化时,watcher类中会重新调用reset方法,再次监听

    private final CuratorWatcher watcher = new CuratorWatcher()
{
@Override
public void process(WatchedEvent event) throws Exception
{
reset();
}
};

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366039-1-1.html 上篇帖子: ZooKeeper解惑(转) 下篇帖子: 分布式服务框架 Zookeeper
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表