设为首页 收藏本站
查看: 1548|回复: 0

[经验分享] ZooKeeper3.4.6学习笔记(二)简单介绍

[复制链接]

尚未签到

发表于 2019-1-9 06:18:48 | 显示全部楼层 |阅读模式
  

鲁春利的工作笔记,谁说程序员不能有文艺范?
  

  Zookeeper中的角色主要有以下三类,如下表所示:

  

  系统模型如图所示:


  客户端通过2181端口与ZooKeeper Service进行连接,不同的ZooKeeper Server之间通过2888或3888端口进行通信。

  

  org.apache.zookeeper.ZooKeeper是ZooKeeper客户端类库中主要的类,所有使用Zookeeper服务的应用程序都必须首先初始化ZooKeeper类的对象。ZooKeeper类提供了对ZooKeeper操作的所有方法,并且该类中的方法都是线程安全的。Client连接成功后,会被自动分配一个Session ID,通过发送周期性的心跳信息来保持会话的有效性。
  

  ZooKeeper API有同步和异步两种方式。在ZooKeeper中连接到Server的客户端进程可以在ZooKeeper节点上设置watcher(实现Watcher接口),当节点发生变化时,客户端能够捕获到并触发相应的动作。但是,每个watcher只能被触发一次,为了保证每次变化都能被捕获到,需要多次进行watcher注册。
  

  ZooKeeper
package org.apache.zookeeper;
public class ZooKeeper {
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
    throws IOException {
        this(connectString, sessionTimeout, watcher, false);
    }
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly) throws IOException {
        watchManager.defaultWatcher = watcher;
        ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
        // 实现实际上是实例化了一个客户端对象
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        cnxn.start();   
    }
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            long sessionId, byte[] sessionPasswd) throws IOException {
        this(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, false);
    }
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
        throws IOException {
         // 略   
    }
}  

  Watcher
package org.apache.zookeeper;
public interface Watcher {
    // 定义Event的可能状态
    public interface Event {   
        public enum KeeperState {
            @Deprecated                     @Deprecated
            Unknown (-1), Disconnected (0), NoSyncConnected (1),SyncConnected (3),
            AuthFailed (4), ConnectedReadOnly (5), SaslAuthenticated(6), Expired (-112);
        }
        // 定义Event的类型
        public enum EventType {
            None (-1), NodeCreated (1), NodeDeleted (2), NodeDataChanged (3), NodeChildrenChanged (4);
        }
    }
    // 抽象方法
    abstract public void process(WatchedEvent event);
}  

  java代码

package com.invic.zk;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
*
* @author lucl
*
*/
public class CusZooKeeperClient {
    //
    private static String connectString = "nnode:2181,dnode1:2181,dnode2:2181";
    private static int SESSION_TIMEOUT = 30 * 1000;
    /**
     * @param args
     */
    public static void main(String[] args) {
        CusZooKeeperClient client = new CusZooKeeperClient();
        // 最基本的会话实例
        // client.zkSessionInstance();
        // 复用sessionID和sessionPasswd来创建会话
        // client.zkSessionInstanceWithSidAndPwd();
        // 创建节点
        // client.createZNode();
        // 获取子节点
        // client.getChildren();
        // 获取数据
        // client.getData();
        // 更新数据
        // client.setData();
        // 是否存在
        client.exists();
    }
    /**
     * 最基本的会话实例
     */
    private void zkSessionInstance () {
        CountDownLatch latch = new CountDownLatch(1);
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch));
            latch.await();
            System.out.println("ZK sessionId " + zk.getSessionId() + "\t and state " + zk.getState());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != zk) {
                 try {
                    zk.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * 复用sessionID和sessionPasswd创建一个ZooKeeper对象实例(维持之前会话的有效性)
     */
    private void zkSessionInstanceWithSidAndPwd () {
        CountDownLatch latch = new CountDownLatch(1);
        ZooKeeper zk = null;
        long sessionID = 0;
        byte [] sessionPwd = "".getBytes();
        try {
            System.out.println("============================001==============================");
            {
                zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch));
                latch.await();
                sessionID = zk.getSessionId();
                sessionPwd = zk.getSessionPasswd();
                System.out.println("ZK sessionId is " + sessionID + "\t and sessionPwd " + new String(sessionPwd));
            }
            System.out.println("=============================002==============================");
            {
                zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch), sessionID, sessionPwd);
            }
            System.out.println("=============================003==============================");
            {
                // with wrong sessionID and sessionPasswd
                long wrongSessionID = 1L;
                byte [] wrongSessionPwd = "wrong".getBytes();
                zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch), wrongSessionID, wrongSessionPwd);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != zk) {
                    zk.close();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 创建节点
     */
    private void createZNode () {
        CountDownLatch latch = new CountDownLatch(1);
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch));
            latch.await();
            /**
             *  同步API
             *  create(final String path, byte data[], List acl, CreateMode createMode)
             */
            {
                String path = "/zk-sync";
                // 内容只支持字节数组,字符串可以直接getBytes(),其他内容需要Hessian或Kryo等序列化工具
                byte [] datas = "luchunli".getBytes();   
                // Ids.OPEN_ACL_UNSAFE表示任何操作都不受权限控制
                String basePath = zk.create(path + "-001", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                System.out.println("Success create znode : " + basePath);    // `/zk-test-001`
                basePath = zk.create(path + "-002", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                System.out.println("Success create znode : " + basePath);    // `/zk-test-0020000000008`会在path后自动加个数字
            }
            /**
             *  异步API
             */
            System.out.println("================================for async===================================");
            String path = "/zk-async";
            byte [] datas = "luchunli".getBytes();
            // 异步中接口不会抛异常,异常都是在回调函数中体现
            zk.create(path + "-001", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new AsyncCallback.StringCallback() {
                @Override
                public void proce***esult(int rc, String path, Object ctx, String name) {
                    switch (rc) {
                        case 0 :
                            System.out.println("Create result code is " + rc + ", " + path + ", " + ctx + ", real name " + name);
                            break;
                        case -4 :
                            System.out.println("Connection abort.");
                            break;
                        case -110 :
                            System.out.println("Node is already exists.");
                            break;
                        case -112 :
                            System.out.println("Session expired.");
                            break;
                        default:
                            System.out.println("Unknown error.");
                    }
                }
            }, "I am Context.");
            zk.create(path + "-002", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
                @Override
                public void proce***esult(int rc, String path, Object ctx, String name) {
                    switch (rc) {
                        case 0 :
                            System.out.println("Create result code is " + rc + ", " + path + ", " + ctx + ", real name " + name);
                            break;
                        case -4 :
                            System.out.println("Connection abort.");
                            break;
                        case -110 :
                            System.out.println("Node is already exists.");
                            break;
                        case -112 :
                            System.out.println("Session expired.");
                            break;
                        default:
                            System.out.println("Unknown error.");
                    }
                }
            }, "I am Context.");
            
            Thread.sleep(30 * 1000);        // 为了看到创建的临时节点的效果,zk.close()之后节点就不存在了
        } catch (Exception e) {
            e.printStackTrace();
        } finally  {
            try{
                if (null != zk) {
                    zk.close();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 获取子节点及节点数据
     */
    private void getChildren () {
        CountDownLatch latch = new CountDownLatch(1);
        ZooKeeper zk = null;
        try {
            CusZKWatcher watcher = new CusZKWatcher(latch);
            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);
            latch.await();
            watcher.setZk(zk);
            String path = "/zk-book";
            byte [] datas = "luchunli".getBytes();
            {
                String basePath = zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                System.out.println("Success create znode : " + basePath);    //
            }
            {
                String basePath = zk.create(path + "/c1", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                System.out.println("Success create znode : " + basePath);    //
            }
            /**
             * 同步API获取子节点
             */
            {
                List childs = zk.getChildren(path, true);
                System.out.println("In client get children is : " + childs);
            }
            /**
             * 异步API获取子节点
             */
            {
                System.out.println("================================for async===================================");
                zk.getChildren(path, true, new AsyncCallback.Children2Callback() {
                    @Override
                    public void proce***esult(int rc, String path, Object ctx, List children, Stat stat) {
                        System.out.println("Get children znode result " + rc + ", " + path + ", " + children + ", " + stat);
                    }
                }, "I am Context.");
            }
            {
                String basePath = zk.create(path + "/c2", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                System.out.println("Success create znode : " + basePath);    //
            }
            Thread.sleep(30 * 1000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally  {
            try{
                if (null != zk) {
                    zk.close();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 获取值
     */
    private void getData () {
        CountDownLatch latch = new CountDownLatch(1);
        ZooKeeper zk = null;
        try {
            CusZKWatcher watcher = new CusZKWatcher(latch);
            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);
            latch.await();
            watcher.setZk(zk);
            String path = "/zk-test";
            byte [] datas = "luchunli".getBytes();
            {
                String basePath = zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                System.out.println("ZK create node " + basePath + " success.");
            }
            /**
             * 同步API获取数据
             */
            {
                Stat stat = new Stat();
                byte [] bytess = zk.getData(path, true, stat);
                System.out.println("Get data from zk : " + new String(bytess));
                System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
            }
            {
                Stat sstat = zk.setData(path, datas, -1);        // znode节点数据或内容的变化都会引起NodeDataChanged
                System.out.println("after setData stat info is : " + sstat.getCzxid() + ", " + sstat.getMzxid() + ", " + sstat.getVersion());
            }
            Thread.sleep(30 * 1000);
            /**
             * 异步API获取数据
             */
            System.out.println("================================for async===================================");
            zk.getData(path, true, new AsyncCallback.DataCallback() {
                @Override
                public void proce***esult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                    System.out.println("Get data result " + rc + ", " + path + ", " + new String(data) + ", " + stat);
                }
            }, "I am Context.");
            {
                Stat sstat = zk.setData(path, datas, -1);        // znode节点数据或内容的变化都会引起NodeDataChanged
                System.out.println("after setData stat info is : " + sstat.getCzxid() + ", " + sstat.getMzxid() + ", " + sstat.getVersion());
            }
            Thread.sleep(30 * 1000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally  {
            try{
                if (null != zk) {
                    zk.close();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 更新值
     */
    private void setData () {
        /**
         * setData(final String path, byte data[], int version)
         * 或
         * setData(final String path, byte data[], int version, StatCallback cb, Object ctx)
         *
         * version说明:getData中无version,ZooKeeper的setData接口中的version参数是由CAS理论演化而来
         *
         */
        CountDownLatch latch = new CountDownLatch(1);
        ZooKeeper zk = null;
        try {
            CusZKWatcher watcher = new CusZKWatcher(latch);
            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);
            latch.await();
            watcher.setZk(zk);
            String path = "/zk-test";
            byte [] datas = "luchunli".getBytes();
            {
                String basePath = zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                System.out.println("ZK create node " + basePath + " success.");
            }
            /**
             * 同步API更新数据
             */
            Stat stat = zk.setData(path, datas, -1);        // 传入参数-1,表示客户端基于数据的最新版本进行更新
            System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
            Stat stat2 = zk.setData(path, datas, stat.getVersion());    // 基于上一次的version更新数据,更新成功
            System.out.println("stat2 info is : " + stat2.getCzxid() + ", " + stat2.getMzxid() + ", " + stat2.getVersion());
            Stat stat3 = zk.setData(path, datas, stat.getVersion());    // 仍然基于之前的version,更新数据失败(BadVersion for /zk-test)
            System.out.println("stat3 info is : " + stat3.getCzxid() + ", " + stat3.getMzxid() + ", " + stat3.getVersion());
            /**
             * 异步API更新数据
             */
            System.out.println("================================for async===================================");
            zk.setData(path, datas, -1, new AsyncCallback.StatCallback() {
                @Override
                public void proce***esult(int rc, String path, Object ctx, Stat stat) {
                    System.out.println("Get data result " + rc + ", " + path + ", " + ctx + ", " + stat);
                }
            }, "I am Context");
            Thread.sleep(30 * 1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 判断指定的znode是否存在
     * @param path
     * @return
     */
    private void exists () {
        CountDownLatch latch = new CountDownLatch(1);
        ZooKeeper zk = null;
        try {
            CusZKWatcher watcher = new CusZKWatcher(latch);
            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);
            latch.await();
            watcher.setZk(zk);
            String path = "/zk-test";
            {
                Stat stat = zk.exists(path, true);
                if (null == stat) {
                    System.out.println("Node " + path + " not exists.");
                } else {
                    System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
                }
            }
            byte [] datas = "luchunli".getBytes();
            {
                zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);        // 需要为PERSISTENT节点,否则无法创建子节点
                zk.exists(path, true);
                zk.setData(path, "test".getBytes(), -1);
                zk.create(path + "/c1", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);    // 子节点的变化客户端未watch到
                zk.delete(path + "/c1", -1);
                zk.delete(path, -1);
            }
            Thread.sleep(30 * 1000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != zk) {
                try {
                    zk.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * 删除节点-无法删除非空节点
     */
    private void delZNode () {
        /**
         *  delete(final String path, int version)
         *  或
         *  delete(final String path, int version, VoidCallback cb, Object ctx)
         *  
         *  若version传入-1则直接删除
         */
    }
}  观察者
package com.invic.zk;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
/**
*
* @author lucl
* @description 自定义观察者
*
*/
public class CusZKWatcher implements Watcher {
    private CountDownLatch latch = null;
    private ZooKeeper zk = null;
    private CusZKWatcher() {
        // TODO Auto-generated constructor stub
    }
    public CusZKWatcher (CountDownLatch latch) {
        this.latch = latch;
    }
    public ZooKeeper getZk() {
        return zk;
    }
    public void setZk(ZooKeeper zk) {
        this.zk = zk;
    }
    @Override
    public void process(WatchedEvent event) {
        if (null == latch) {
            return;
        }
        KeeperState state = event.getState();
        int stateValue = state.getIntValue();
        EventType eventType = event.getType();
        int typeValue = eventType.getIntValue();
        System.out.println("KeeperState is : " + stateValue + "\tEventType : " + typeValue);
        try {
            if (KeeperState.SyncConnected == state) {
                System.out.println("zk session established in mode of SyncConnected.");
                if (EventType.None == eventType && null == event.getPath()) {
                    this.latch.countDown();
                } else if (EventType.NodeCreated == eventType) {
                    System.out.println("Node " + event.getPath() + " created.");
                    Stat stat = zk.exists(event.getPath(), true);
                    System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
                } else if (EventType.NodeDeleted == eventType) {
                    System.out.println("Node " + event.getPath() + " deleted.");
                    Stat stat = zk.exists(event.getPath(), true);
                    if (null == stat) {
                        System.out.println("Znod " + event.getPath() + " not exists.");
                    } else {
                        System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
                    }
                } else if (EventType.NodeChildrenChanged == eventType) {        
                    if (null != this.getZk()) {
                        List result = this.getZk().getChildren(event.getPath(), true);
                        System.out.println("Reget children in watcher, result : " + result);
                    }
                } else if (EventType.NodeDataChanged == eventType) {            
                    if (null != this.getZk()) {
                        Stat stat = new Stat();
                        byte[] datas = this.getZk().getData(event.getPath(), true, stat);
                        System.out.println("Reget data from zk : " + new String(datas));
                        System.out.println("Restat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
                    }
                } else {
                    System.out.println("Unknown type : " + eventType);
                }
            } else {
                System.out.println("Unknown error : " + state);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}  3、其他客户端API
  ZkClient
  

  Curator





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-660889-1-1.html 上篇帖子: ZooKeeper3.4.6学习笔记(四)权限控制ACL 下篇帖子: zookeeper基本命令,增删改查
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表