191145685 发表于 2019-1-9 06:18:48

ZooKeeper3.4.6学习笔记(二)简单介绍

  

鲁春利的工作笔记,谁说程序员不能有文艺范?
  

  Zookeeper中的角色主要有以下三类,如下表所示:
http://s5.运维网.com/wyfs02/M00/77/74/wKiom1Zn_MiTM-hiAAJEqb1aXLw904.jpg
  

  系统模型如图所示:

http://s1.运维网.com/wyfs02/M02/77/73/wKioL1Zn_V_SUjDfAAIUilxAAG4386.jpg
  客户端通过2181端口与ZooKeeper Service进行连接,不同的ZooKeeper Server之间通过2888或3888端口进行通信。

  

  org.apache.zookeeper.ZooKeeper是ZooKeeper客户端类库中主要的类,所有使用Zookeeper服务的应用程序都必须首先初始化ZooKeeper类的对象。ZooKeeper类提供了对ZooKeeper操作的所有方法,并且该类中的方法都是线程安全的。Client连接成功后,会被自动分配一个Session ID,通过发送周期性的心跳信息来保持会话的有效性。
  

  ZooKeeper API有同步和异步两种方式。在ZooKeeper中连接到Server的客户端进程可以在ZooKeeper节点上设置watcher(实现Watcher接口),当节点发生变化时,客户端能够捕获到并触发相应的动作。但是,每个watcher只能被触发一次,为了保证每次变化都能被捕获到,需要多次进行watcher注册。
  

  ZooKeeper
package org.apache.zookeeper;
public class ZooKeeper {
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
    throws IOException {
      this(connectString, sessionTimeout, watcher, false);
    }
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly) throws IOException {
      watchManager.defaultWatcher = watcher;
      ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
      HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
      // 实现实际上是实例化了一个客户端对象
      cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
      cnxn.start();   
    }
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            long sessionId, byte[] sessionPasswd) throws IOException {
      this(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, false);
    }
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
      throws IOException {
         // 略   
    }
}  

  Watcher
package org.apache.zookeeper;
public interface Watcher {
    // 定义Event的可能状态
    public interface Event {   
      public enum KeeperState {
            @Deprecated                     @Deprecated
            Unknown (-1), Disconnected (0), NoSyncConnected (1),SyncConnected (3),
            AuthFailed (4), ConnectedReadOnly (5), SaslAuthenticated(6), Expired (-112);
      }
      // 定义Event的类型
      public enum EventType {
            None (-1), NodeCreated (1), NodeDeleted (2), NodeDataChanged (3), NodeChildrenChanged (4);
      }
    }
    // 抽象方法
    abstract public void process(WatchedEvent event);
}  

  java代码

package com.invic.zk;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
*
* @author lucl
*
*/
public class CusZooKeeperClient {
    //
    private static String connectString = "nnode:2181,dnode1:2181,dnode2:2181";
    private static int SESSION_TIMEOUT = 30 * 1000;
    /**
   * @param args
   */
    public static void main(String[] args) {
      CusZooKeeperClient client = new CusZooKeeperClient();
      // 最基本的会话实例
      // client.zkSessionInstance();
      // 复用sessionID和sessionPasswd来创建会话
      // client.zkSessionInstanceWithSidAndPwd();
      // 创建节点
      // client.createZNode();
      // 获取子节点
      // client.getChildren();
      // 获取数据
      // client.getData();
      // 更新数据
      // client.setData();
      // 是否存在
      client.exists();
    }
    /**
   * 最基本的会话实例
   */
    private void zkSessionInstance () {
      CountDownLatch latch = new CountDownLatch(1);
      ZooKeeper zk = null;
      try {
            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch));
            latch.await();
            System.out.println("ZK sessionId " + zk.getSessionId() + "\t and state " + zk.getState());
      } catch (Exception e) {
            e.printStackTrace();
      } finally {
            if (null != zk) {
               try {
                  zk.close();
                } catch (InterruptedException e) {
                  e.printStackTrace();
                }
            }
      }
    }
    /**
   * 复用sessionID和sessionPasswd创建一个ZooKeeper对象实例(维持之前会话的有效性)
   */
    private void zkSessionInstanceWithSidAndPwd () {
      CountDownLatch latch = new CountDownLatch(1);
      ZooKeeper zk = null;
      long sessionID = 0;
      byte [] sessionPwd = "".getBytes();
      try {
            System.out.println("============================001==============================");
            {
                zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch));
                latch.await();
                sessionID = zk.getSessionId();
                sessionPwd = zk.getSessionPasswd();
                System.out.println("ZK sessionId is " + sessionID + "\t and sessionPwd " + new String(sessionPwd));
            }
            System.out.println("=============================002==============================");
            {
                zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch), sessionID, sessionPwd);
            }
            System.out.println("=============================003==============================");
            {
                // with wrong sessionID and sessionPasswd
                long wrongSessionID = 1L;
                byte [] wrongSessionPwd = "wrong".getBytes();
                zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch), wrongSessionID, wrongSessionPwd);
            }
      } catch (Exception e) {
            e.printStackTrace();
      } finally {
            try {
                if (null != zk) {
                  zk.close();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
      }
    }
    /**
   * 创建节点
   */
    private void createZNode () {
      CountDownLatch latch = new CountDownLatch(1);
      ZooKeeper zk = null;
      try {
            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch));
            latch.await();
            /**
             *同步API
             *create(final String path, byte data[], List acl, CreateMode createMode)
             */
            {
                String path = "/zk-sync";
                // 内容只支持字节数组,字符串可以直接getBytes(),其他内容需要Hessian或Kryo等序列化工具
                byte [] datas = "luchunli".getBytes();   
                // Ids.OPEN_ACL_UNSAFE表示任何操作都不受权限控制
                String basePath = zk.create(path + "-001", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                System.out.println("Success create znode : " + basePath);    // `/zk-test-001`
                basePath = zk.create(path + "-002", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                System.out.println("Success create znode : " + basePath);    // `/zk-test-0020000000008`会在path后自动加个数字
            }
            /**
             *异步API
             */
            System.out.println("================================for async===================================");
            String path = "/zk-async";
            byte [] datas = "luchunli".getBytes();
            // 异步中接口不会抛异常,异常都是在回调函数中体现
            zk.create(path + "-001", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new AsyncCallback.StringCallback() {
                @Override
                public void proce***esult(int rc, String path, Object ctx, String name) {
                  switch (rc) {
                        case 0 :
                            System.out.println("Create result code is " + rc + ", " + path + ", " + ctx + ", real name " + name);
                            break;
                        case -4 :
                            System.out.println("Connection abort.");
                            break;
                        case -110 :
                            System.out.println("Node is already exists.");
                            break;
                        case -112 :
                            System.out.println("Session expired.");
                            break;
                        default:
                            System.out.println("Unknown error.");
                  }
                }
            }, "I am Context.");
            zk.create(path + "-002", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
                @Override
                public void proce***esult(int rc, String path, Object ctx, String name) {
                  switch (rc) {
                        case 0 :
                            System.out.println("Create result code is " + rc + ", " + path + ", " + ctx + ", real name " + name);
                            break;
                        case -4 :
                            System.out.println("Connection abort.");
                            break;
                        case -110 :
                            System.out.println("Node is already exists.");
                            break;
                        case -112 :
                            System.out.println("Session expired.");
                            break;
                        default:
                            System.out.println("Unknown error.");
                  }
                }
            }, "I am Context.");
            
            Thread.sleep(30 * 1000);      // 为了看到创建的临时节点的效果,zk.close()之后节点就不存在了
      } catch (Exception e) {
            e.printStackTrace();
      } finally{
            try{
                if (null != zk) {
                  zk.close();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
      }
    }
    /**
   * 获取子节点及节点数据
   */
    private void getChildren () {
      CountDownLatch latch = new CountDownLatch(1);
      ZooKeeper zk = null;
      try {
            CusZKWatcher watcher = new CusZKWatcher(latch);
            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);
            latch.await();
            watcher.setZk(zk);
            String path = "/zk-book";
            byte [] datas = "luchunli".getBytes();
            {
                String basePath = zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                System.out.println("Success create znode : " + basePath);    //
            }
            {
                String basePath = zk.create(path + "/c1", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                System.out.println("Success create znode : " + basePath);    //
            }
            /**
             * 同步API获取子节点
             */
            {
                List childs = zk.getChildren(path, true);
                System.out.println("In client get children is : " + childs);
            }
            /**
             * 异步API获取子节点
             */
            {
                System.out.println("================================for async===================================");
                zk.getChildren(path, true, new AsyncCallback.Children2Callback() {
                  @Override
                  public void proce***esult(int rc, String path, Object ctx, List children, Stat stat) {
                        System.out.println("Get children znode result " + rc + ", " + path + ", " + children + ", " + stat);
                  }
                }, "I am Context.");
            }
            {
                String basePath = zk.create(path + "/c2", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                System.out.println("Success create znode : " + basePath);    //
            }
            Thread.sleep(30 * 1000);
      } catch (Exception e) {
            e.printStackTrace();
      } finally{
            try{
                if (null != zk) {
                  zk.close();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
      }
    }
    /**
   * 获取值
   */
    private void getData () {
      CountDownLatch latch = new CountDownLatch(1);
      ZooKeeper zk = null;
      try {
            CusZKWatcher watcher = new CusZKWatcher(latch);
            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);
            latch.await();
            watcher.setZk(zk);
            String path = "/zk-test";
            byte [] datas = "luchunli".getBytes();
            {
                String basePath = zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                System.out.println("ZK create node " + basePath + " success.");
            }
            /**
             * 同步API获取数据
             */
            {
                Stat stat = new Stat();
                byte [] bytess = zk.getData(path, true, stat);
                System.out.println("Get data from zk : " + new String(bytess));
                System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
            }
            {
                Stat sstat = zk.setData(path, datas, -1);      // znode节点数据或内容的变化都会引起NodeDataChanged
                System.out.println("after setData stat info is : " + sstat.getCzxid() + ", " + sstat.getMzxid() + ", " + sstat.getVersion());
            }
            Thread.sleep(30 * 1000);
            /**
             * 异步API获取数据
             */
            System.out.println("================================for async===================================");
            zk.getData(path, true, new AsyncCallback.DataCallback() {
                @Override
                public void proce***esult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                  System.out.println("Get data result " + rc + ", " + path + ", " + new String(data) + ", " + stat);
                }
            }, "I am Context.");
            {
                Stat sstat = zk.setData(path, datas, -1);      // znode节点数据或内容的变化都会引起NodeDataChanged
                System.out.println("after setData stat info is : " + sstat.getCzxid() + ", " + sstat.getMzxid() + ", " + sstat.getVersion());
            }
            Thread.sleep(30 * 1000);
      } catch (Exception e) {
            e.printStackTrace();
      } finally{
            try{
                if (null != zk) {
                  zk.close();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
      }
    }
    /**
   * 更新值
   */
    private void setData () {
      /**
         * setData(final String path, byte data[], int version)
         * 或
         * setData(final String path, byte data[], int version, StatCallback cb, Object ctx)
         *
         * version说明:getData中无version,ZooKeeper的setData接口中的version参数是由CAS理论演化而来
         *
         */
      CountDownLatch latch = new CountDownLatch(1);
      ZooKeeper zk = null;
      try {
            CusZKWatcher watcher = new CusZKWatcher(latch);
            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);
            latch.await();
            watcher.setZk(zk);
            String path = "/zk-test";
            byte [] datas = "luchunli".getBytes();
            {
                String basePath = zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                System.out.println("ZK create node " + basePath + " success.");
            }
            /**
             * 同步API更新数据
             */
            Stat stat = zk.setData(path, datas, -1);      // 传入参数-1,表示客户端基于数据的最新版本进行更新
            System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
            Stat stat2 = zk.setData(path, datas, stat.getVersion());    // 基于上一次的version更新数据,更新成功
            System.out.println("stat2 info is : " + stat2.getCzxid() + ", " + stat2.getMzxid() + ", " + stat2.getVersion());
            Stat stat3 = zk.setData(path, datas, stat.getVersion());    // 仍然基于之前的version,更新数据失败(BadVersion for /zk-test)
            System.out.println("stat3 info is : " + stat3.getCzxid() + ", " + stat3.getMzxid() + ", " + stat3.getVersion());
            /**
             * 异步API更新数据
             */
            System.out.println("================================for async===================================");
            zk.setData(path, datas, -1, new AsyncCallback.StatCallback() {
                @Override
                public void proce***esult(int rc, String path, Object ctx, Stat stat) {
                  System.out.println("Get data result " + rc + ", " + path + ", " + ctx + ", " + stat);
                }
            }, "I am Context");
            Thread.sleep(30 * 1000);
      } catch (Exception e) {
            e.printStackTrace();
      }
    }
    /**
   * 判断指定的znode是否存在
   * @param path
   * @return
   */
    private void exists () {
      CountDownLatch latch = new CountDownLatch(1);
      ZooKeeper zk = null;
      try {
            CusZKWatcher watcher = new CusZKWatcher(latch);
            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);
            latch.await();
            watcher.setZk(zk);
            String path = "/zk-test";
            {
                Stat stat = zk.exists(path, true);
                if (null == stat) {
                  System.out.println("Node " + path + " not exists.");
                } else {
                  System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
                }
            }
            byte [] datas = "luchunli".getBytes();
            {
                zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);      // 需要为PERSISTENT节点,否则无法创建子节点
                zk.exists(path, true);
                zk.setData(path, "test".getBytes(), -1);
                zk.create(path + "/c1", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);    // 子节点的变化客户端未watch到
                zk.delete(path + "/c1", -1);
                zk.delete(path, -1);
            }
            Thread.sleep(30 * 1000);
      } catch (Exception e) {
            e.printStackTrace();
      } finally {
            if (null != zk) {
                try {
                  zk.close();
                } catch (InterruptedException e) {
                  e.printStackTrace();
                }
            }
      }
    }
    /**
   * 删除节点-无法删除非空节点
   */
    private void delZNode () {
      /**
         *delete(final String path, int version)
         *或
         *delete(final String path, int version, VoidCallback cb, Object ctx)
         *
         *若version传入-1则直接删除
         */
    }
}  观察者
package com.invic.zk;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
/**
*
* @author lucl
* @description 自定义观察者
*
*/
public class CusZKWatcher implements Watcher {
    private CountDownLatch latch = null;
    private ZooKeeper zk = null;
    private CusZKWatcher() {
      // TODO Auto-generated constructor stub
    }
    public CusZKWatcher (CountDownLatch latch) {
      this.latch = latch;
    }
    public ZooKeeper getZk() {
      return zk;
    }
    public void setZk(ZooKeeper zk) {
      this.zk = zk;
    }
    @Override
    public void process(WatchedEvent event) {
      if (null == latch) {
            return;
      }
      KeeperState state = event.getState();
      int stateValue = state.getIntValue();
      EventType eventType = event.getType();
      int typeValue = eventType.getIntValue();
      System.out.println("KeeperState is : " + stateValue + "\tEventType : " + typeValue);
      try {
            if (KeeperState.SyncConnected == state) {
                System.out.println("zk session established in mode of SyncConnected.");
                if (EventType.None == eventType && null == event.getPath()) {
                  this.latch.countDown();
                } else if (EventType.NodeCreated == eventType) {
                  System.out.println("Node " + event.getPath() + " created.");
                  Stat stat = zk.exists(event.getPath(), true);
                  System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
                } else if (EventType.NodeDeleted == eventType) {
                  System.out.println("Node " + event.getPath() + " deleted.");
                  Stat stat = zk.exists(event.getPath(), true);
                  if (null == stat) {
                        System.out.println("Znod " + event.getPath() + " not exists.");
                  } else {
                        System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
                  }
                } else if (EventType.NodeChildrenChanged == eventType) {      
                  if (null != this.getZk()) {
                        List result = this.getZk().getChildren(event.getPath(), true);
                        System.out.println("Reget children in watcher, result : " + result);
                  }
                } else if (EventType.NodeDataChanged == eventType) {            
                  if (null != this.getZk()) {
                        Stat stat = new Stat();
                        byte[] datas = this.getZk().getData(event.getPath(), true, stat);
                        System.out.println("Reget data from zk : " + new String(datas));
                        System.out.println("Restat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
                  }
                } else {
                  System.out.println("Unknown type : " + eventType);
                }
            } else {
                System.out.println("Unknown error : " + state);
            }
      } catch (Exception e) {
            e.printStackTrace();
      }
    }
}  3、其他客户端API
  ZkClient
  

  Curator




页: [1]
查看完整版本: ZooKeeper3.4.6学习笔记(二)简单介绍