|
import java.util.List;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class CuratorClient {
private static String HOST = "192.168.100.108:2181,192.168.100.110:2181,192.168.100.111:2181";
private static String PATH = "/curzk";
private static String CHILDPATH = "/curzk/1";
public CuratorClient(){
}
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory
.builder().connectString(HOST)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
//创建
try {
String cpath = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(CHILDPATH, "123".getBytes());
System.out.println(cpath);
} catch (Exception e) {
e.printStackTrace();
}
//子节点
List strList = client.getChildren().forPath(PATH);
for(String str : strList){
System.out.println(str);
}
//获取节点内容
byte[] b = client.getData().forPath(CHILDPATH);
System.out.println(new String(b));
//修改节点内容
client.setData().forPath(CHILDPATH, "abc".getBytes());
//判断节点是否存在
Stat s = client.checkExists().forPath(PATH);
if(s!=null){
System.out.println("存在");
}else {
System.out.println("不存在");
}
//监听主节点
final NodeCache nodeCache = new NodeCache(client, PATH);
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
byte[] b = nodeCache.getCurrentData().getData();
System.out.println("主节点内容变化:"+new String(b));
}
});
//监听子节点
final PathChildrenCache pcc = new PathChildrenCache(client, PATH, true);
pcc.start();
pcc.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
if(event.getType()==Type.CHILD_ADDED){
System.out.println("CHILD_ADDED:"+new String(event.getData().getData()));
}else if(event.getType()==Type.CHILD_UPDATED){
System.out.println("CHILD_UPDATED:"+new String(event.getData().getData()));
}else if(event.getType()==Type.CHILD_REMOVED){
System.out.println("CHILD_REMOVED:"+new String(event.getData().getData()));
}
}
});
client.setData().forPath(CHILDPATH, "abcDE".getBytes());
Thread.sleep(Integer.MAX_VALUE);
}
}
|
|
|