设为首页 收藏本站
查看: 693|回复: 0

[经验分享] ZooKeeper 基本API使用

[复制链接]

尚未签到

发表于 2017-4-19 09:35:22 | 显示全部楼层 |阅读模式
  ZooKeeper是一个分布式协调服务,在很多开源的分布式服务中都有使用!现在介绍一下ZK的基本API的使用
  ZK的主要特性:


  •  客户端如果对ZK的一个数据节点注册一个Watcher监听,那么当该数据节点的内容或子节点列表发生变更时zk服务器都会向所有订阅客户端发送变更通知
  • 对于在ZK上创建的临时节点,一旦客户端与服务器见的会话失败,那么该临时节点也会自动清除
  • ZK将会保证客户端无法重复创建一个已经存在的数据节点
  基于zk的这两个特性可以解决很多分布式问题。
  ZK作为一个分布式服务框架,主要用来解决分布式数据一致性问题,它提供了简单分布式元语,并且对多种语言提供了API,下面是介绍JAVA客户端API的使用。
  ZK提供的API接口一般有一个同步接口一个异步接口,使用方法基本相同。


  • 客户端可以通过创建一个org.apache.zookeeper.ZooKeeper的实例来连接服务器。主要的构造方法有:
  ZooKeeper(String connectString,int sessionTimeout,Watcher watcher)
  ZooKeeper(String connectString,int sessionTimeout,Watcher watcher,boolean canReadOnly)
  ZooKeeper(String connectString,int sessionTimeout,Watcher watcher,long sessionId,byte[] sessionPasswd)
  ZooKeeper(String connectString,int sessionTimeout,Watcher watcher,long sessionId,byte[] sessionPasswd,boolean canReadOnly)
  connectString:值ZK服务器列表,如localhost:2181,127.0.0.1:2181
  sessionTimeout:指会话超时时间,以毫秒为单位的整数值;在一个会话周期内,zk客户端和服务器会通过心跳维持会话的有效性,在sessionTimeout时间内,如果连接断开,zk客户端会主动和服务器建立连接
  watcher:Watcher事件通知处理器
  canReadOnly:这是一个boolean值,用于标识当前会话是否支持“read-only”模式。默认情况下,在ZK集群中,一个机器如果和集群中的半数以及半数以上的机器失去连接,那么这个机器将不再处理客户端请求(读请求+写请求均不处理);但是有时候我们希望在发生此类故障时不影响读取请求的处理,这个就是zk的read-only 模式
  sessionId和sessionPasswd,分别代表会话Id和会话密钥。这连个参数可以唯一确定一个会话。



  • 创建节点常用的接口是
  String create(final String path,byte[] data,List<ACL>acl,CreateModel createModel);
  String create(final String path,byte[] data,List<ACL> acl,CreateModel,StringCallback cb,Object ctx);
  如果节点已经存在则抛出一个NodeExistsException异常
  回调接口
  StringCallback{
  public void (int rc,String path,Object ctx,String name)
  }
  rc是result code 服务端响应结果码。客户端可以从这个结果码中识别出API的调用结果,常见的结果码有:
  0(OK),接口调用成功
  -4(ConnectionLoss),客户端和服务器连接断开
  -110(NodeExists) 节点已存在
  -112(SessionExpired)会话已过期
  path: 接口调用传入的数据节点的节点路径
  ctx: 接口调用传入的ctx参数
  name: 实际在服务器端创建的节点名


  • 删除节点常用的接口:
  void delete(final String path,int version);
  void delete(final String path,int version,VoidCallback cb,Object ctx);
  如果删除的节点不存在则会抛出一个NoNodeException,如果删除数据的版本号不正确则抛出一个BadVersionException


  • 获取子节点列表的接口:
  List<String> getChildren(final String path,Watcher watcher);
  List<String> getChildren(final String path,boolean watcher);
  void  getChildren(final String path,Watcher watcher,ChildrenCallback cb,Object ctx);
  void  getChildren(final String path,boolean watcher,ChildrenCallback cb,Object ctx);
  List<String> getChildren(final String path,Watcher watcher,Stat stat);
  List<String> getChildren(final String path,boolean watcher,Stat stat);
  void getChildren(final String path,Watcher watcher,Stat stat,ChildrenCallback cb,Object ctx);
  void getChildren(final String path,boolean watcher,Stat stat,ChildrenCallback cb,Object ctx);


  • 获取节点数据的接口
  byte[] getData(final String path,Watcher watcher,Stat stat);
  byte[] getData(final String path,boolean watcher,Stat stat);
  void getData(final String path,Watcher watcher,Stat stat,DataCallback cb,Object ctx);
  void getData(final String path,boolean watcher,Stat stat,DataCallback cb,Object ctx);


  • 更新数据的接口
  Stat  setData(final String path,byte[] data,int version);
  void setData(final String path,byte[] data,int version,StatCallback cb,Object ctx);
  如果数据的版本不一致则抛出一个BadVersionExcepion
  zk的数据版本是从0开始计数的。如果客户端传入的是-1,则表示zk服务器需要基于最新的数据进行更新。如果对zk的数据节点的更新操作没有原子性要求则可以使用-1.


  • 检测节点是否存在的接口
  Stat exists(final String path,Watcher watcher);
  Stat exists(final String path,boolean watcher);
  void exists(final String path, Watcher watcher,StatCallback cb,Object ctx);
  void exists(final String path,boolean watcher,StatCallback cb,Object ctx);


  • version 说明
  version参数用于指定节点数据的版本,表明本次更新操作是针对指定数据版本进行的。这是ZK对 CAS(Compare and Swap)的实现,只有数据的版本和预期的版本一致时才会更新数据,这样可以有效避免分布式环境下并发更新的问题。如果多个客户端并发更新,则只有一个可 以更新成功,其他的因为版本不一致则不会更新数据。zk中version和传统意义上的软件版本概念上有很大的不同,在ZK中,version表示的是对 节点数据内容,子节点列表或是ACL信息修改的次数。在一个节点数据创建完毕后,其viersion=0,表示当前节点数据自创建以后被修改0次,修改一 次版本的值增加1;使用-1表示使用基于最新版本进行修改,即每次都会执行更新!version强调的是变更次数,即使前后两次变更的内容的值没有发生变化,version的值依然会变更。
  在创建zk的客户端时在构造方法中默认设置一个Watcher,这个Watcher作为zk会话期间的默认watcher.同时zk也可以通过 getData, getChildren,exist三个接口向zk注册watcher.
  Watcher是通知的处理器,WatchedEvent的属性有 KeeperState ,EventType

public class WatchedEvent {
final private KeeperState keeperState;
final private EventType eventType;
private String path;
/**
* Create a WatchedEvent with specified type, state and path
*/
public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
this.keeperState = keeperState;
this.eventType = eventType;
this.path = path;
}
  WatchedEvent的KeeperStat和EventType对应关系

KeeperStateEventType触发条件备注
SyncConnectedNone客户端与服务器成功建立会话会话是连接状态
NodeCreatedWatcher监听的节点创建成功
NodeDeletedWatcher监听的节点被删除
NodeDataChangedWatcher监听的节点数据内容发生变更
NodeChildrenChangedWatcher监听的节点子列表发生变更
DisconnectedNone客户端与服务器断开连接 
ExpiredNone会话超时 
AuthFailedNone  


  • Watcher特性说明

一次性   一旦watcher被触发,ZK都会从相应的存储中移除。因此在使用Watcher时需要谨记使用前一定要注册
  客户端串行执行
  客户端Watcher回调的过程是一个串行同步的过程,这是为了保证顺序。同时需要谨记千万不要因为一个Watcher的处理逻辑影响了整个客户端的Watcher回调
  轻量
  WatchedEvent是ZK整个Watcher通知机制的最小通知单元。从上文已经介绍了这个数据结构中只包含三部分:通知状态,事件类型,节点路径。也就是说,Watcher通知仅仅告诉客户端发生了什么事情,而不会说明事件的具体内容。

/**
*
* @author zhangwei_david
* @version $Id: ZKDemo.java, v 0.1 2015年5月2日 上午9:10:56 zhangwei_david Exp $
*/
public class ZKDemo {
private static ZooKeeper authZK = null;
private static String    path   = "/zk-demo";
/**
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
sync();
TimeUnit.SECONDS.sleep(300);
createZKWithSessionIdAndSessionPasswd();
authCreate();
}
/**
* 指定权限
*
* @throws Exception
*/
private static void authCreate() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
authZK = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
System.out.println("监听器,监听到的事件时:" + event);
if (KeeperState.SyncConnected == event.getState()) {
//如果客户端已经建立连接闭锁减一
System.out.println("建立连接");
latch.countDown();
}
}
});
// 等待连接建立
latch.await();
// 增加权限
authZK.addAuthInfo("digest", "foo:true".getBytes());
// 判断path 是否已经存在
if (authZK.exists(path, true) == null) {
authZK.create(path, "init".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
}
String str = authZK.create(path + "/auth", "test".getBytes(), Ids.CREATOR_ALL_ACL,
CreateMode.EPHEMERAL);
System.out.println(str);
}
private static void createZKWithSessionIdAndSessionPasswd() throws Exception {
// 初始化一个闭锁
final CountDownLatch latch = new CountDownLatch(1);
/**
* 创建一个ZooKeeper对象,localhost:2181是zk服务器的主机名和端口号
*  50000  sessionTimeout session超时时间
*  Watcher 默认的WatchedEvent事件处理器
*/
ZooKeeper originalZk = new ZooKeeper("localhost:2181", 50000, new Watcher() {
public void process(WatchedEvent event) {
/**
* 如果zk客户端和服务器已经建立连接,闭锁减一
*/
if (KeeperState.SyncConnected == event.getState()) {
latch.countDown();
}
}
});
// 等待闭锁为0,即一直等待zk客户端和服务器建立连接
latch.await();
// 通过sessionId,和sessionPasswd创建一个复用的ZK客户端
ZooKeeper zkCopy = new ZooKeeper(path, 50000, new Watcher() {
public void process(WatchedEvent event) {
}
}, originalZk.getSessionId(), originalZk.getSessionPasswd());
System.out.println("复用zk" + zkCopy + " original zk=" + originalZk);
}
/**
*
*
* @return
* @throws Exception
*/
private static ZooKeeper sync() throws Exception {
// 初始化一个闭锁
final CountDownLatch latch = new CountDownLatch(1);
/**
* 创建一个ZooKeeper对象,localhost:2181是zk服务器的主机名和端口号
*  50000  sessionTimeout session超时时间
*  Watcher 默认的WatchedEvent事件处理器
*/
ZooKeeper zk = new ZooKeeper("localhost:2181", 50000, new Watcher() {
public void process(WatchedEvent event) {
System.out.println("默认监听器,KeeperStat:" + event.getState() + ", EventType:"
+ event.getType() + ", path:" + event.getPath());
/**
* 如果zk客户端和服务器已经建立连接,闭锁减一
*/
if (KeeperState.SyncConnected == event.getState()) {
latch.countDown();
}
}
});
// 等待闭锁为0,即一直等待zk客户端和服务器建立连接
latch.await();
zk.getData(path, true, new DataCallback() {
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println("rc=" + rc + ",path=" + path + "data=" + data + " Stat=" + stat
+ ", data=" + (data == null ? "null" : new String(data)));
}
}, null);
// 创建一个临时节点,非安全的权限
zk.create(path + "/linshi", "ephemeral".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL, new StringCallback() {
public void processResult(int rc, String path, Object ctx, String name) {
if (rc == 0) {
System.out.println("成功创建一个临时节点");
}
}
}, null);
// 更新path节点数据
zk.setData(path, String.valueOf(new Date().getTime()).getBytes(), -1, new StatCallback() {
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (rc == 0) {
System.out.println("成功更新节点数据, rc=0, path=" + path + ", stat=" + stat);
}
}
}, null);
/**
* 如果该节点不存在则创建持久化的节点
* ZK的节点有三位,持久化节点(PERSISTENT),临时节点(EPHEMERAL),顺序节点(SEQUENTIAL)
* 具体的组合有 /**
PERSISTENT,
//持久化序列节点
PERSISTENT_SEQUENTIAL,
EPHEMERAL ,
//临时序列节点
EPHEMERAL_SEQUENTIAL
*
*/
// 创建一个持久化节点,如果该节点已经存在则不需要再次创建
if (zk.exists(path, true) == null) {
zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 删除持久化序列节点
try {
zk.delete(path + "/seq ", 1, new VoidCallback() {
public void processResult(int rc, String path, Object ctx) {
System.out.println("删除" + ZKDemo.path + "/seq 的结果是:rc=" + rc + " path=" + path
+ ",context=" + ctx);
}
}, null);
} catch (Exception e) {
// 示例代码创建新的持久节点前,如果以前存在则删除,如果不存在则或抛出一个NoNodeException
}
// 创建一个持久化序列节点
String seqPath = zk
.create(path + "/seq ", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//读取子节点列表,并注册一个监听器
zk.getChildren(path, new Watcher() {
public void process(WatchedEvent event) {
System.out.println(event);
}
});
//更新节点数据
zk.setData(seqPath, "seqDemo".getBytes(), -1);
//创建临时节点
zk.create(path + "/test", "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 删除节点
zk.delete(path + "/test", 0);
// 异步创建临时序列节点
zk.create(path + "/test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL, new StringCallback() {
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("创建临时节点的结果是:result code=" + rc + ", path=" + path
+ " context=" + ctx + ", name=" + name);
}
}, "Test Context");
TimeUnit.SECONDS.sleep(5);
// 更新数据
if (zk.exists(path + "/test", true) != null) {
zk.setData(path + "/test", "testData".getBytes(), -1);
}
zk.create(path + "/test2", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 使用默认监听器,获取子节点
List<String> znodes = zk.getChildren(path, true);
// 获取所有节点下的数据
Stat stat = new Stat();
for (String str : znodes) {
byte[] data = zk.getData(path + "/" + str, true, stat);
System.out.println("获取节点:" + str + " 的数据是:"
+ (data == null ? "null" : new String(data)));
}
return zk;
}
}

 运行的结果是:

2015-06-26 07:37:29  [ main:0 ] - [ INFO ]  Initiating client connection, connectString=localhost:2181 sessionTimeout=50000 watcher=com.cathy.demo.zk.ZKDemo$4@8102c8
2015-06-26 07:37:29  [ main-SendThread(0:0:0:0:0:0:0:1:2181):32 ] - [ INFO ]  Opening socket connection to server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
2015-06-26 07:37:29  [ main-SendThread(0:0:0:0:0:0:0:1:2181):32 ] - [ INFO ]  Socket connection established to 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181, initiating session
2015-06-26 07:37:29  [ main-SendThread(0:0:0:0:0:0:0:1:2181):47 ] - [ INFO ]  Session establishment complete on server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181, sessionid = 0x14e2ceda3d6000c, negotiated timeout = 40000
默认监听器,KeeperStat:SyncConnected, EventType:None, path:null
rc=0,path=/zk-demodata=[B@dca3b1 Stat=128,1002,1430525410663,1435275420998,31,344,0,0,13,8,1008
, data=1435275420998
默认监听器,KeeperStat:SyncConnected, EventType:NodeDataChanged, path:/zk-demo
成功更新节点数据, rc=0, path=/zk-demo, stat=128,1012,1430525410663,1435275449577,32,344,0,0,13,8,1008
删除/zk-demo/seq 的结果是:rc=0 path=/zk-demo/seq ,context=null
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/zk-demo
创建临时节点的结果是:result code=0, path=/zk-demo/test context=Test Context, name=/zk-demo/test0000000178
获取节点:test0000000175 的数据是:123
获取节点:curator 的数据是:192.168.1.102
获取节点:testData 的数据是:��
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366192-1-1.html 上篇帖子: 如何使用Curator监听zookeeper事件变化 下篇帖子: 分布式助手Zookeeper(七)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表