|
鲁春利的工作笔记,谁说程序员不能有文艺范?
Zookeeper中的角色主要有以下三类,如下表所示:
系统模型如图所示:
客户端通过2181端口与ZooKeeper Service进行连接,不同的ZooKeeper Server之间通过2888或3888端口进行通信。
org.apache.zookeeper.ZooKeeper是ZooKeeper客户端类库中主要的类,所有使用Zookeeper服务的应用程序都必须首先初始化ZooKeeper类的对象。ZooKeeper类提供了对ZooKeeper操作的所有方法,并且该类中的方法都是线程安全的。Client连接成功后,会被自动分配一个Session ID,通过发送周期性的心跳信息来保持会话的有效性。
ZooKeeper API有同步和异步两种方式。在ZooKeeper中连接到Server的客户端进程可以在ZooKeeper节点上设置watcher(实现Watcher接口),当节点发生变化时,客户端能够捕获到并触发相应的动作。但是,每个watcher只能被触发一次,为了保证每次变化都能被捕获到,需要多次进行watcher注册。
ZooKeeper
package org.apache.zookeeper;
public class ZooKeeper {
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
throws IOException {
this(connectString, sessionTimeout, watcher, false);
}
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly) throws IOException {
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
// 实现实际上是实例化了一个客户端对象
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
long sessionId, byte[] sessionPasswd) throws IOException {
this(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, false);
}
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
throws IOException {
// 略
}
}
Watcher
package org.apache.zookeeper;
public interface Watcher {
// 定义Event的可能状态
public interface Event {
public enum KeeperState {
@Deprecated @Deprecated
Unknown (-1), Disconnected (0), NoSyncConnected (1),SyncConnected (3),
AuthFailed (4), ConnectedReadOnly (5), SaslAuthenticated(6), Expired (-112);
}
// 定义Event的类型
public enum EventType {
None (-1), NodeCreated (1), NodeDeleted (2), NodeDataChanged (3), NodeChildrenChanged (4);
}
}
// 抽象方法
abstract public void process(WatchedEvent event);
}
java代码
package com.invic.zk;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
*
* @author lucl
*
*/
public class CusZooKeeperClient {
//
private static String connectString = "nnode:2181,dnode1:2181,dnode2:2181";
private static int SESSION_TIMEOUT = 30 * 1000;
/**
* @param args
*/
public static void main(String[] args) {
CusZooKeeperClient client = new CusZooKeeperClient();
// 最基本的会话实例
// client.zkSessionInstance();
// 复用sessionID和sessionPasswd来创建会话
// client.zkSessionInstanceWithSidAndPwd();
// 创建节点
// client.createZNode();
// 获取子节点
// client.getChildren();
// 获取数据
// client.getData();
// 更新数据
// client.setData();
// 是否存在
client.exists();
}
/**
* 最基本的会话实例
*/
private void zkSessionInstance () {
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = null;
try {
zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch));
latch.await();
System.out.println("ZK sessionId " + zk.getSessionId() + "\t and state " + zk.getState());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != zk) {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 复用sessionID和sessionPasswd创建一个ZooKeeper对象实例(维持之前会话的有效性)
*/
private void zkSessionInstanceWithSidAndPwd () {
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = null;
long sessionID = 0;
byte [] sessionPwd = "".getBytes();
try {
System.out.println("============================001==============================");
{
zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch));
latch.await();
sessionID = zk.getSessionId();
sessionPwd = zk.getSessionPasswd();
System.out.println("ZK sessionId is " + sessionID + "\t and sessionPwd " + new String(sessionPwd));
}
System.out.println("=============================002==============================");
{
zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch), sessionID, sessionPwd);
}
System.out.println("=============================003==============================");
{
// with wrong sessionID and sessionPasswd
long wrongSessionID = 1L;
byte [] wrongSessionPwd = "wrong".getBytes();
zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch), wrongSessionID, wrongSessionPwd);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != zk) {
zk.close();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 创建节点
*/
private void createZNode () {
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = null;
try {
zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch));
latch.await();
/**
* 同步API
* create(final String path, byte data[], List acl, CreateMode createMode)
*/
{
String path = "/zk-sync";
// 内容只支持字节数组,字符串可以直接getBytes(),其他内容需要Hessian或Kryo等序列化工具
byte [] datas = "luchunli".getBytes();
// Ids.OPEN_ACL_UNSAFE表示任何操作都不受权限控制
String basePath = zk.create(path + "-001", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("Success create znode : " + basePath); // `/zk-test-001`
basePath = zk.create(path + "-002", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Success create znode : " + basePath); // `/zk-test-0020000000008`会在path后自动加个数字
}
/**
* 异步API
*/
System.out.println("================================for async===================================");
String path = "/zk-async";
byte [] datas = "luchunli".getBytes();
// 异步中接口不会抛异常,异常都是在回调函数中体现
zk.create(path + "-001", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new AsyncCallback.StringCallback() {
@Override
public void proce***esult(int rc, String path, Object ctx, String name) {
switch (rc) {
case 0 :
System.out.println("Create result code is " + rc + ", " + path + ", " + ctx + ", real name " + name);
break;
case -4 :
System.out.println("Connection abort.");
break;
case -110 :
System.out.println("Node is already exists.");
break;
case -112 :
System.out.println("Session expired.");
break;
default:
System.out.println("Unknown error.");
}
}
}, "I am Context.");
zk.create(path + "-002", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
@Override
public void proce***esult(int rc, String path, Object ctx, String name) {
switch (rc) {
case 0 :
System.out.println("Create result code is " + rc + ", " + path + ", " + ctx + ", real name " + name);
break;
case -4 :
System.out.println("Connection abort.");
break;
case -110 :
System.out.println("Node is already exists.");
break;
case -112 :
System.out.println("Session expired.");
break;
default:
System.out.println("Unknown error.");
}
}
}, "I am Context.");
Thread.sleep(30 * 1000); // 为了看到创建的临时节点的效果,zk.close()之后节点就不存在了
} catch (Exception e) {
e.printStackTrace();
} finally {
try{
if (null != zk) {
zk.close();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 获取子节点及节点数据
*/
private void getChildren () {
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = null;
try {
CusZKWatcher watcher = new CusZKWatcher(latch);
zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);
latch.await();
watcher.setZk(zk);
String path = "/zk-book";
byte [] datas = "luchunli".getBytes();
{
String basePath = zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("Success create znode : " + basePath); //
}
{
String basePath = zk.create(path + "/c1", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("Success create znode : " + basePath); //
}
/**
* 同步API获取子节点
*/
{
List childs = zk.getChildren(path, true);
System.out.println("In client get children is : " + childs);
}
/**
* 异步API获取子节点
*/
{
System.out.println("================================for async===================================");
zk.getChildren(path, true, new AsyncCallback.Children2Callback() {
@Override
public void proce***esult(int rc, String path, Object ctx, List children, Stat stat) {
System.out.println("Get children znode result " + rc + ", " + path + ", " + children + ", " + stat);
}
}, "I am Context.");
}
{
String basePath = zk.create(path + "/c2", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("Success create znode : " + basePath); //
}
Thread.sleep(30 * 1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try{
if (null != zk) {
zk.close();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 获取值
*/
private void getData () {
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = null;
try {
CusZKWatcher watcher = new CusZKWatcher(latch);
zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);
latch.await();
watcher.setZk(zk);
String path = "/zk-test";
byte [] datas = "luchunli".getBytes();
{
String basePath = zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("ZK create node " + basePath + " success.");
}
/**
* 同步API获取数据
*/
{
Stat stat = new Stat();
byte [] bytess = zk.getData(path, true, stat);
System.out.println("Get data from zk : " + new String(bytess));
System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
}
{
Stat sstat = zk.setData(path, datas, -1); // znode节点数据或内容的变化都会引起NodeDataChanged
System.out.println("after setData stat info is : " + sstat.getCzxid() + ", " + sstat.getMzxid() + ", " + sstat.getVersion());
}
Thread.sleep(30 * 1000);
/**
* 异步API获取数据
*/
System.out.println("================================for async===================================");
zk.getData(path, true, new AsyncCallback.DataCallback() {
@Override
public void proce***esult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println("Get data result " + rc + ", " + path + ", " + new String(data) + ", " + stat);
}
}, "I am Context.");
{
Stat sstat = zk.setData(path, datas, -1); // znode节点数据或内容的变化都会引起NodeDataChanged
System.out.println("after setData stat info is : " + sstat.getCzxid() + ", " + sstat.getMzxid() + ", " + sstat.getVersion());
}
Thread.sleep(30 * 1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try{
if (null != zk) {
zk.close();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 更新值
*/
private void setData () {
/**
* setData(final String path, byte data[], int version)
* 或
* setData(final String path, byte data[], int version, StatCallback cb, Object ctx)
*
* version说明:getData中无version,ZooKeeper的setData接口中的version参数是由CAS理论演化而来
*
*/
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = null;
try {
CusZKWatcher watcher = new CusZKWatcher(latch);
zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);
latch.await();
watcher.setZk(zk);
String path = "/zk-test";
byte [] datas = "luchunli".getBytes();
{
String basePath = zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("ZK create node " + basePath + " success.");
}
/**
* 同步API更新数据
*/
Stat stat = zk.setData(path, datas, -1); // 传入参数-1,表示客户端基于数据的最新版本进行更新
System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
Stat stat2 = zk.setData(path, datas, stat.getVersion()); // 基于上一次的version更新数据,更新成功
System.out.println("stat2 info is : " + stat2.getCzxid() + ", " + stat2.getMzxid() + ", " + stat2.getVersion());
Stat stat3 = zk.setData(path, datas, stat.getVersion()); // 仍然基于之前的version,更新数据失败(BadVersion for /zk-test)
System.out.println("stat3 info is : " + stat3.getCzxid() + ", " + stat3.getMzxid() + ", " + stat3.getVersion());
/**
* 异步API更新数据
*/
System.out.println("================================for async===================================");
zk.setData(path, datas, -1, new AsyncCallback.StatCallback() {
@Override
public void proce***esult(int rc, String path, Object ctx, Stat stat) {
System.out.println("Get data result " + rc + ", " + path + ", " + ctx + ", " + stat);
}
}, "I am Context");
Thread.sleep(30 * 1000);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 判断指定的znode是否存在
* @param path
* @return
*/
private void exists () {
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = null;
try {
CusZKWatcher watcher = new CusZKWatcher(latch);
zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);
latch.await();
watcher.setZk(zk);
String path = "/zk-test";
{
Stat stat = zk.exists(path, true);
if (null == stat) {
System.out.println("Node " + path + " not exists.");
} else {
System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
}
}
byte [] datas = "luchunli".getBytes();
{
zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // 需要为PERSISTENT节点,否则无法创建子节点
zk.exists(path, true);
zk.setData(path, "test".getBytes(), -1);
zk.create(path + "/c1", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); // 子节点的变化客户端未watch到
zk.delete(path + "/c1", -1);
zk.delete(path, -1);
}
Thread.sleep(30 * 1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != zk) {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 删除节点-无法删除非空节点
*/
private void delZNode () {
/**
* delete(final String path, int version)
* 或
* delete(final String path, int version, VoidCallback cb, Object ctx)
*
* 若version传入-1则直接删除
*/
}
} 观察者
package com.invic.zk;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
/**
*
* @author lucl
* @description 自定义观察者
*
*/
public class CusZKWatcher implements Watcher {
private CountDownLatch latch = null;
private ZooKeeper zk = null;
private CusZKWatcher() {
// TODO Auto-generated constructor stub
}
public CusZKWatcher (CountDownLatch latch) {
this.latch = latch;
}
public ZooKeeper getZk() {
return zk;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
@Override
public void process(WatchedEvent event) {
if (null == latch) {
return;
}
KeeperState state = event.getState();
int stateValue = state.getIntValue();
EventType eventType = event.getType();
int typeValue = eventType.getIntValue();
System.out.println("KeeperState is : " + stateValue + "\tEventType : " + typeValue);
try {
if (KeeperState.SyncConnected == state) {
System.out.println("zk session established in mode of SyncConnected.");
if (EventType.None == eventType && null == event.getPath()) {
this.latch.countDown();
} else if (EventType.NodeCreated == eventType) {
System.out.println("Node " + event.getPath() + " created.");
Stat stat = zk.exists(event.getPath(), true);
System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
} else if (EventType.NodeDeleted == eventType) {
System.out.println("Node " + event.getPath() + " deleted.");
Stat stat = zk.exists(event.getPath(), true);
if (null == stat) {
System.out.println("Znod " + event.getPath() + " not exists.");
} else {
System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
}
} else if (EventType.NodeChildrenChanged == eventType) {
if (null != this.getZk()) {
List result = this.getZk().getChildren(event.getPath(), true);
System.out.println("Reget children in watcher, result : " + result);
}
} else if (EventType.NodeDataChanged == eventType) {
if (null != this.getZk()) {
Stat stat = new Stat();
byte[] datas = this.getZk().getData(event.getPath(), true, stat);
System.out.println("Reget data from zk : " + new String(datas));
System.out.println("Restat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
}
} else {
System.out.println("Unknown type : " + eventType);
}
} else {
System.out.println("Unknown error : " + state);
}
} catch (Exception e) {
e.printStackTrace();
}
}
} 3、其他客户端API
ZkClient
Curator
|
|