设为首页 收藏本站
查看: 1282|回复: 0

[经验分享] ZooKeeper与Curator注册和监控

[复制链接]

尚未签到

发表于 2015-9-6 09:12:55 | 显示全部楼层 |阅读模式
  Curator提供了对zookeeper客户端的封装,并监控连接状态和会话session,特别是会话session过期后,curator能够重新连接zookeeper,并且创建一个新的session。
  对于zk的使用者来说,session的概念至关重要,如果想了解更多session的说明,请访问:http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html
  
  zk客户端和zk服务器间主要可能存在下面几种异常情况:
  1.     短暂失去连接:此时客户端检测到与服务端的连接已经断开,但是服务端维护的客户端session尚未过期,之后客户端和服务端重新建立了连接;当客户端重新连接后,由于session没有过期,zookeeper能够保证连接恢复后保持正常服务。
  2.     失去连接时间很长:此时服务器相对于客户端的session已经过期了,与先前session相关的watcher和ephemeral的路径和数据都会消失;当Curator重新创建了与zk的连接后,会获取到session expired异常,Curator会销毁先前的session,并且会创建一个新的session,需要注意的是,与之前session相关的watcher和ephemeral类型的路径和数据在新的session中也不会存在,需要开发者在CuratorFramework.getConnectionStateListenable().addListener()中添加状态监听事件,对ConnectionState.LOST事件进行监听,当session过期后,使得之前的session状态得以恢复。对于ephemeral类型,在客户端应该保持数据的状态,以便及时恢复。
  3.     客户端重新启动:不论先前的zk session是否已经过期,都需要重新创建临时节点、添加数据和watch事件,先前的session也会在稍后的一段时间内过期。
  4.     Zk服务器重新启动:由于zk将session信息存放到了硬盘上,因此重启后,先前未过期的session仍然存在,在zk服务器启动后,客户端与zk服务器创建新的连接,并使用先前的session,与1相同。
  5.     需要注意的是,当session过期了,在session过期期间另外的客户端修改了zk的值,那么这个修改在客户端重新连接到zk上时,zk客户端不会接收到这个修改的watch事件(尽管添加了watch),如果需要严格的watch逻辑,就需要在curator的状态监控中添加逻辑。
  
  特别提示:watcher仅仅是一次性的,zookeeper通知了watcher事件后,就会将这个watcher从session中删除,因此,如果想继续监控,就要添加新的watcher。
  
  下面提供了对persistent和ephemeral两种类型节点的监控方法,其中get方法说明了persistent节点如何监控,而register方法说明了ephemeral类型的节点如何监控。
public class CuratorTest {

    private CuratorFramework zkTools;

    private ConcurrentSkipListSet watchers = newConcurrentSkipListSet();

    private static Charset charset = Charset.forName("utf-8");

   

   

    public CuratorTest() {     

       zkTools = CuratorFrameworkFactory

              .builder()

              .connectString("10.11.21.78:12306")

              .namespace("zk/test")

              .retryPolicy(new RetryNTimes(2000,20000))

              .build();

       zkTools.start();

      

        }  

   

   

    public void addReconnectionWatcher(final String path,final ZookeeperWatcherType watcherType,final CuratorWatcher watcher){

       synchronized (this) {

           if(!watchers.contains(watcher.toString()))//不要添加重复的监听事件

           {

              watchers.add(watcher.toString());

              System.out.println("add new watcher " + watcher);

              zkTools.getConnectionStateListenable().addListener(newConnectionStateListener() {  

                  @Override

                  public void stateChanged(CuratorFramework client, ConnectionState newState) {

                     System.out.println(newState);

                     if(newState == ConnectionState.LOST){//处理session过期

                         try{

                            if(watcherType == ZookeeperWatcherType.EXITS){

                               zkTools.checkExists().usingWatcher(watcher).forPath(path);

                            }else if(watcherType == ZookeeperWatcherType.GET_CHILDREN){

                               zkTools.getChildren().usingWatcher(watcher).forPath(path);

                            }else if(watcherType == ZookeeperWatcherType.GET_DATA){

                                zkTools.getData().usingWatcher(watcher).forPath(path);

                            }else if(watcherType == ZookeeperWatcherType.CREATE_ON_NO_EXITS){

                                //ephemeral类型的节点session过期了,需要重新创建节点,并且注册监听事件,之后监听事件中,

                                //会处理create事件,将路径值恢复到先前状态

                                Stat stat =zkTools.checkExists().usingWatcher(watcher).forPath(path);                             

                                if(stat == null){

                                   System.err.println("to create");

                                   zkTools.create()

                                   .creatingParentsIfNeeded()

                                   .withMode(CreateMode.EPHEMERAL)

                                   .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)

                                   .forPath(path);                                    

                                }

                            }

                         }catch (Exception e) {

                            e.printStackTrace();

                         }

                     }

                  }

              });         

           }

       }

    }

   


    public void create() throws Exception{

       zkTools.create()//创建一个路径

       .creatingParentsIfNeeded()//如果指定的节点的父节点不存在,递归创建父节点

       .withMode(CreateMode.PERSISTENT)//存储类型(临时的还是持久的)

       .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//访问权限

       .forPath("zk/test");//创建的路径

    }

   

    public void put() throws Exception{

       zkTools.//对路径节点赋值

       setData().

       forPath("zk/test","hello world".getBytes(Charset.forName("utf-8")));

    }

   

    public void get() throws Exception{

       String path = "zk/test";

       ZKWatch watch = new ZKWatch(path);

       byte[] buffer = zkTools.

                         getData().

                         usingWatcher(watch).forPath(path);

       System.out.println(new String(buffer,charset));

       //添加session过期的监控

       addReconnectionWatcher(path, ZookeeperWatcherType.GET_DATA, watch);

    }  

   

   

    public void register() throws Exception{

      

       String ip = InetAddress.getLocalHost().getHostAddress();

       String registeNode = "zk/register/"+ip;//节点路径

      

       byte[] data = "disable".getBytes(charset);//节点值


       CuratorWatcher watcher = new ZKWatchRegister(registeNode,data);    //创建一个register watcher

      

       Stat stat = zkTools.checkExists().forPath(registeNode);

       if(stat != null){

           zkTools.delete().forPath(registeNode);

       }

       zkTools.create()

       .creatingParentsIfNeeded()          .withMode(CreateMode.EPHEMERAL)

       .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)

       .forPath(registeNode,data);//创建的路径和值

      

       //添加到session过期监控事件中

       addReconnectionWatcher(registeNode, ZookeeperWatcherType.CREATE_ON_NO_EXITS,watcher);               

       data = zkTools.getData().usingWatcher(watcher).forPath(registeNode);

       System.out.println("get path form zk : "+registeNode+":"+new String(data,charset));

    }

   

    public static void main(String[] args) throws Exception {

       CuratorTest test = new CuratorTest();

       test.get();

       test.register();

       Thread.sleep(10000000000L);


    }

   

    public class ZKWatch implements CuratorWatcher{

       private final String path;

      

       public String getPath() {

           return path;

       }

       public ZKWatch(String path) {

           this.path = path;

       }

       @Override

       public void process(WatchedEvent event) throws Exception {

           System.out.println(event.getType());

           if(event.getType() == EventType.NodeDataChanged){

              byte[] data = zkTools.

                     getData().

                     usingWatcher(this).forPath(path);

              System.out.println(path+":"+new String(data,Charset.forName("utf-8")));

           }

       }

      

    }

   

   

    public class ZKWatchRegister implements CuratorWatcher{

       private final String path;

       private byte[] value;

       public String getPath() {

           return path;

       }

       public ZKWatchRegister(String path,byte[] value) {

           this.path = path;

           this.value = value;

       }

       @Override

       public void process(WatchedEvent event) throws Exception {

           System.out.println(event.getType());

           if(event.getType() == EventType.NodeDataChanged){

              //节点数据改变了,需要记录下来,以便session过期后,能够恢复到先前的数据状态

              byte[] data = zkTools.

                     getData().

                     usingWatcher(this).forPath(path);

              value = data;

              System.out.println(path+":"+new String(data,charset));

           }else if(event.getType() == EventType.NodeDeleted){

              //节点被删除了,需要创建新的节点

              System.out.println(path + ":" + path +" has been deleted.");

              Stat stat = zkTools.checkExists().usingWatcher(this).forPath(path);

              if(stat == null){

                  zkTools.create()

                  .creatingParentsIfNeeded()

                  .withMode(CreateMode.EPHEMERAL)

                  .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)

                  .forPath(path);

              }

           }else if(event.getType() == EventType.NodeCreated){

              //节点被创建时,需要添加监听事件(创建可能是由于session过期后,curator的状态监听部分触发的)

              System.out.println(path + ":" +" has been created!" + "the current data is " +new String(value));

              zkTools.setData().forPath(path, value);

              zkTools.getData().usingWatcher(this).forPath(path);

           }

       }     

    }

   

    public enum ZookeeperWatcherType{

       GET_DATA,GET_CHILDREN,EXITS,CREATE_ON_NO_EXITS

    }

  }

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-109960-1-1.html 上篇帖子: HBase client访问ZooKeeper获取root-region-server DeadLock问题(zookeeper.ClientCnxn Unab 下篇帖子: zookeeper启动错误 transaction type: 2 error: KeeperErrorCode = NoNode for /hbase
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表