设为首页 收藏本站
查看: 1289|回复: 0

[经验分享] ZooKeeper客户端事件串行化处理

[复制链接]

尚未签到

发表于 2019-1-9 07:15:14 | 显示全部楼层 |阅读模式
  为了提升系统的性能,进一步提高系统的吞吐能力,最近公司很多系统都在进行异步化改造。在异步化改造的过程中,肯定会比以前碰到更多的多线程问题,上周就碰到ZooKeeper客户端异步化过程中的一个死锁问题,这里说明下。
  通常ZooKeeper对于同一个API,提供了同步和异步两种调用方式。
同步接口很容易理解,使用方法如下:
ZooKeeper zk = new ZooKeeper(...);
List children = zk.getChildren( path, true );  异步接口就相对复杂一点,使用方法如下:
ZooKeeper zk = new ZooKeeper(...);
zk.getChildren( path, true, new AsyncCallback.Children2Callback() {
@Override
public void proce***esult( int rc, String path, Object ctx, List children, Stat stat ) {
System.out.println( "Recive the response." );
}
}, null);  我们可以看到,异步调用中,需要注册一个Children2Callback,并实现回调方法:proce***esult。
  上周碰到这样的问题:应用注册了对某znode子节点列表变化的监听,逻辑是在接受到ZooKeeper服务器节点列表变更通知(EventType.NodeChildrenChanged)的时候,会重新获取一次子节点列表。之前,他们是使用同步接口,整个应用可以正常运行,但是这次异步化改造后,出现了诡异现象,能够收到子节点的变更通知,但是无法重新获取子节点列表了。
  下面,我首先把应用之前使用同步接口的逻辑代码,用一个简单的demo来演示下,如下:
package book.chapter05;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
/**
* ZooKeeper API 获取子节点列表,使用同步(sync)接口。
* @author 银时
*/
public class ZooKeeper_GetChildren_API_Sync_Usage implements Watcher {
private CountDownLatch connectedSemaphore = new CountDownLatch( 1 );
private static CountDownLatch _semaphore = new CountDownLatch( 1 );
private ZooKeeper zk;
ZooKeeper createSession( String connectString, int sessionTimeout, Watcher watcher ) throws IOException {
ZooKeeper zookeeper = new ZooKeeper( connectString, sessionTimeout, watcher );
try {
connectedSemaphore.await();
} catch ( InterruptedException e ) {
}
return zookeeper;
}
/** create path by sync */
void createPath_sync( String path, String data, CreateMode createMode ) throws IOException, KeeperException, InterruptedException {
if ( zk == null ) {
zk = this.createSession( "domain1.book.zookeeper:2181", 5000, this );
}
zk.create( path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, createMode );
}
/** Get children znodes of path and set watches */
List getChildren( String path ) throws KeeperException, InterruptedException, IOException{
System.out.println( "===Start to get children znodes.===" );
if ( zk == null ) {
zk = this.createSession( "domain1.book.zookeeper:2181", 5000, this );
}
return zk.getChildren( path, true );
}
public static void main( String[] args ) throws IOException, InterruptedException {
ZooKeeper_GetChildren_API_Sync_Usage sample = new ZooKeeper_GetChildren_API_Sync_Usage();
String path = "/get_children_test";
try {
sample.createPath_sync( path, "", CreateMode.PERSISTENT );
sample.createPath_sync( path + "/c1", "", CreateMode.PERSISTENT );
List childrenList = sample.getChildren( path );
System.out.println( childrenList );
//Add a new child znode to test watches event notify.
sample.createPath_sync( path + "/c2", "", CreateMode.PERSISTENT );
_semaphore.await();
} catch ( KeeperException e ) {
System.err.println( "error: " + e.getMessage() );
e.printStackTrace();
}
}
/**
* Process when receive watched event
*/
@Override
public void process( WatchedEvent event ) {
System.out.println( "Receive watched event:" + event );
if ( KeeperState.SyncConnected == event.getState() ) {
if( EventType.None == event.getType() && null == event.getPath() ){
connectedSemaphore.countDown();
}else if( event.getType() == EventType.NodeChildrenChanged ){
//children list changed
try {
System.out.println( this.getChildren( event.getPath() ) );
_semaphore.countDown();
} catch ( Exception e ) {}
}
}
}
}  输出结果如下:
Receive watched event:WatchedEvent state:SyncConnected type:None path:null
===Start to get children znodes.===
[c1]
Receive watched event:WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/get_children_test
===Start to get children znodes.===
[c1, c2]  在上面这个程序中,我们首先创建了一个父节点:/get_children_test,以及一个子节点:/get_children_test/c1。然后调用getChildren的同步接口来获取/get_children_test节点下的所有子节点,调用的同时注册一个watches。之后,我们继续向/get_children_test节点创建子节点:/get_children_test/c2,这个时候,因为我们之前我们注册了一个watches,因此,一旦此时有子节点被创建,ZooKeeperServer就会向客户端发出“子节点变更”的通知,于是,客户端可以再次调用getChildren方法来获取新的子节点列表。
  这个例子当然是能够正常运行的。现在,我们进行异步化改造,如下:
package book.chapter05;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
/**
* ZooKeeper API 获取子节点列表,使用异步(ASync)接口。
* @author 银时
*/
public class ZooKeeper_GetChildren_API_ASync_Usage_Deadlock implements Watcher {
private CountDownLatch connectedSemaphore = new CountDownLatch( 1 );
private static CountDownLatch _semaphore = new CountDownLatch( 1 );
private ZooKeeper zk;
ZooKeeper createSession( String connectString, int sessionTimeout, Watcher watcher ) throws IOException {
ZooKeeper zookeeper = new ZooKeeper( connectString, sessionTimeout, watcher );
try {
connectedSemaphore.await();
} catch ( InterruptedException e ) {
}
return zookeeper;
}
/** create path by sync */
void createPath_sync( String path, String data, CreateMode createMode ) throws IOException, KeeperException, InterruptedException {
if ( zk == null ) {
zk = this.createSession( "domain1.book.zookeeper:2181", 5000, this );
}
zk.create( path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, createMode );
}
/** Get children znodes of path and set watches */
void getChildren( String path ) throws KeeperException, InterruptedException, IOException{
System.out.println( "===Start to get children znodes.===" );
if ( zk == null ) {
zk = this.createSession( "domain1.book.zookeeper:2181", 5000, this );
}
final CountDownLatch _semaphore_get_children = new CountDownLatch( 1 );
zk.getChildren( path, true, new AsyncCallback.Children2Callback() {
@Override
public void proce***esult( int rc, String path, Object ctx, List children, Stat stat ) {
System.out.println( "Get Children znode result: [response code: " + rc + ", param path: " + path + ", ctx: " + ctx + ", children list: "
+ children + ", stat: " + stat );
_semaphore_get_children.countDown();
}
}, null);
_semaphore_get_children.await();
}
public static void main( String[] args ) throws IOException, InterruptedException {
ZooKeeper_GetChildren_API_ASync_Usage_Deadlock sample = new ZooKeeper_GetChildren_API_ASync_Usage_Deadlock();
String path = "/get_children_test";
try {
sample.createPath_sync( path, "", CreateMode.PERSISTENT );
sample.createPath_sync( path + "/c1", "", CreateMode.PERSISTENT );
//Get children and register watches.
sample.getChildren( path );
//Add a new child znode to test watches event notify.
sample.createPath_sync( path + "/c2", "", CreateMode.PERSISTENT );
_semaphore.await();
} catch ( KeeperException e ) {
System.err.println( "error: " + e.getMessage() );
e.printStackTrace();
}
}
/**
* Process when receive watched event
*/
@Override
public void process( WatchedEvent event ) {
System.out.println( "Receive watched event:" + event );
if ( KeeperState.SyncConnected == event.getState() ) {
if( EventType.None == event.getType() && null == event.getPath() ){
connectedSemaphore.countDown();
}else if( event.getType() == EventType.NodeChildrenChanged ){
//children list changed
try {
this.getChildren( event.getPath() );
_semaphore.countDown();
} catch ( Exception e ) {
e.printStackTrace();
}
}
}
}
}  输出结果如下:
Receive watched event:WatchedEvent state:SyncConnected type:None path:null
===Start to get children znodes.===
Get Children znode result: [response code: 0, param path: /get_children_test, ctx: null, children list: [c1], stat: 555,555,1373931727380,1373931727380,0,1,0,0,0,1,556
Receive watched event:WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/get_children_test
===Start to get children znodes.===  在上面这个demo中,执行逻辑和之前的同步版本基本一致,唯一有区别的地方在于获取子节点列表的过程异步化了。这样一改造,问题就出来了,整个程序在进行第二次获取节点列表的时候,卡住了。和应用方确认了,之前同步版本从来没有出现过这个现象的,所以开始排查这个异步化中哪里会阻塞。
  这里,我们重点讲解在ZooKeeper客户端中,需要处理来自服务端的两类事件通知:一类是Watches时间通知,另一类则是异步接口调用的响应。值得一提的是,在ZooKeeper的客户端线程模型中,这两个事件由同一个线程处理,并且是串行处理。具体可以自己查看事件处理的核心类:org.apache.zookeeper.ClientCnxn.EventThread。




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-660915-1-1.html 上篇帖子: zookeeper集群搭建(个人非详细文档) 下篇帖子: zookeeper安装以及配置和注意事项
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表