设为首页 收藏本站
查看: 887|回复: 0

[经验分享] Zookeeper开源客户端框架Curator简介与示例

[复制链接]

尚未签到

发表于 2017-4-19 12:02:51 | 显示全部楼层 |阅读模式
简介

        Curator最初由Netflix的Jordan Zimmerman开发, Curator提供了一套Java类库, 可以更容易的使用ZooKeeper.
        所谓ZooKeeper技巧(ZooKeeper Recipes),也可以称之为解决方案, 或者叫实现方案, 是指ZooKeeper的使用方法, 比如分布式的配置管理, Leader选举等
        Curator作为Apache ZooKeeper天生配套的组件。ZooKeeper的Java开发者自然而然的会选择它在项目中使用。
        官网链接:http://curator.apache.org/
DSC0000.png

提供的功能组件



  • Framework 提供了一套高级的API, 简化了ZooKeeper的操作。 它增加了很多使用ZooKeeper开发的特性,可以处理ZooKeeper集群复杂的连接管理和重试机制


  • Client是ZooKeeper客户端的一个替代品, 提供了一些底层处理和相关的工具方法


  • Recipes实现了通用ZooKeeper的recipe, 该组件建立在Framework的基础之上


  • Utilities各种工具类


  • Errors异常处理, 连接, 恢复等.


  • Extensionscurator-recipes包实现了通用的技巧,这些技巧在ZooKeeper文档中有介绍。为了避免是这个包(package)变得巨大, recipes/applications将会放入一个独立的extension包下。并使用命名规则curator-x-name.


        Curator 编译好的类库被发布到Maven Center中。Curator包含几个artifact. 你可以根据你的需要在你的项目中加入相应的依赖。对于大多数开发者来说,引入curator-recipes这一个就足够了

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.6.0</version>
</dependency> 
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.6.0</version>
</dependency>
 

代码示例

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

/**
 * DateTime: 2015年1月9日 上午9:14:08
 *
 */
public class CuratorTest {
    private static String zkAddress = "hadoop02:2181,hadoop03:2181,hadoop04:2181";

    public static void main(String[] args) throws Exception {
        CuratorUtil curator = new CuratorUtil(zkAddress);
        curator.createNode("/zkroot/test1", "你好abc11");
        curator.createNode("/zkroot/test2", "你好abc22");
        curator.updateNode("/zkroot/test2", "你好abc22");
        List<String> list = curator.listChildren("/zkroot");
        Map<String, String> map = curator.listChildrenDetail("/zkroot");
        // curator.deleteNode("/zkroot");
        // curator.destory();
        System.out.println("=========================================");
        for (String str : list) {
            System.out.println(str);
        }
        System.out.println("=========================================");
        for (Map.Entry<String, String> entry : map.entrySet()) {
            System.out.println(entry.getKey() + "=>" + entry.getValue());
        }
        // 增加监听
        curator.addWatch("/zkroot", false);
        TimeUnit.SECONDS.sleep(600);
    }
}

class CuratorUtil {
    private CuratorFramework client;

    public CuratorUtil(String zkAddress) {
        client = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(1000, 3));
        client.getCuratorListenable().addListener(new NodeEventListener());
        client.start();
    }

    /**
     * 创建node
     * 
     * @param nodeName
     * @param value
     * @return
     */
    public boolean createNode(String nodeName, String value) {
        boolean suc = false;
        try {
            Stat stat = getClient().checkExists().forPath(nodeName);
            if (stat == null) {
                String opResult = null;
                if (Strings.isNullOrEmpty(value)) {
                    opResult = getClient().create().creatingParentsIfNeeded().forPath(nodeName);
                }
                else {
                    opResult =
                            getClient().create().creatingParentsIfNeeded()
                                .forPath(nodeName, value.getBytes(Charsets.UTF_8));
                }
                suc = Objects.equal(nodeName, opResult);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return suc;
    }

    /**
     * 更新节点
     * 
     * @param nodeName
     * @param value
     * @return
     */
    public boolean updateNode(String nodeName, String value) {
        boolean suc = false;
        try {
            Stat stat = getClient().checkExists().forPath(nodeName);
            if (stat != null) {
                Stat opResult = getClient().setData().forPath(nodeName, value.getBytes(Charsets.UTF_8));
                suc = opResult != null;
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return suc;
    }

    /**
     * 删除节点
     * 
     * @param nodeName
     */
    public void deleteNode(String nodeName) {
        try {
            getClient().delete().deletingChildrenIfNeeded().forPath(nodeName);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 找到指定节点下所有子节点的名称与值
     * 
     * @param node
     * @return
     */
    public Map<String, String> listChildrenDetail(String node) {
        Map<String, String> map = Maps.newHashMap();
        try {
            GetChildrenBuilder childrenBuilder = getClient().getChildren();
            List<String> children = childrenBuilder.forPath(node);
            GetDataBuilder dataBuilder = getClient().getData();
            if (children != null) {
                for (String child : children) {
                    String propPath = ZKPaths.makePath(node, child);
                    map.put(child, new String(dataBuilder.forPath(propPath), Charsets.UTF_8));
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return map;
    }

    /**
     * 列出子节点的名称
     * 
     * @param node
     * @return
     */
    public List<String> listChildren(String node) {
        List<String> children = Lists.newArrayList();
        try {
            GetChildrenBuilder childrenBuilder = getClient().getChildren();
            children = childrenBuilder.forPath(node);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return children;
    }

    /**
     * 增加监听
     * 
     * @param node
     * @param isSelf
     *            true 为node本身增加监听 false 为node的子节点增加监听
     * @throws Exception
     */
    public void addWatch(String node, boolean isSelf) throws Exception {
        if (isSelf) {
            getClient().getData().watched().forPath(node);
        }
        else {
            getClient().getChildren().watched().forPath(node);
        }
    }

    /**
     * 增加监听
     * 
     * @param node
     * @param isSelf
     *            true 为node本身增加监听 false 为node的子节点增加监听
     * @param watcher
     * @throws Exception
     */
    public void addWatch(String node, boolean isSelf, Watcher watcher) throws Exception {
        if (isSelf) {
            getClient().getData().usingWatcher(watcher).forPath(node);
        }
        else {
            getClient().getChildren().usingWatcher(watcher).forPath(node);
        }
    }

    /**
     * 增加监听
     * 
     * @param node
     * @param isSelf
     *            true 为node本身增加监听 false 为node的子节点增加监听
     * @param watcher
     * @throws Exception
     */
    public void addWatch(String node, boolean isSelf, CuratorWatcher watcher) throws Exception {
        if (isSelf) {
            getClient().getData().usingWatcher(watcher).forPath(node);
        }
        else {
            getClient().getChildren().usingWatcher(watcher).forPath(node);
        }
    }

    /**
     * 销毁资源
     */
    public void destory() {
        if (client != null) {
            client.close();
        }
    }

    /**
     * 获取client
     * 
     * @return
     */
    public CuratorFramework getClient() {
        return client;
    }
}

// 监听器
final class NodeEventListener implements CuratorListener {
    @Override
    public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
        System.out.println(event.toString() + ".......................");
        final WatchedEvent watchedEvent = event.getWatchedEvent();
        if (watchedEvent != null) {
            System.out.println(watchedEvent.getState() + "=======================" + watchedEvent.getType());
            if (watchedEvent.getState() == KeeperState.SyncConnected) {
                switch (watchedEvent.getType()) {
                case NodeChildrenChanged:
                    // TODO
                    break;
                case NodeDataChanged:
                    // TODO
                    break;
                default:
                    break;
                }
            }
        }
    }
  http://my.oschina.net/cloudcoder/blog?disp=2&p=1&catalog=418649
  http://supben.iteye.com/blog/2094077

package com.cyyun;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
public class CrudExamples {
private static CuratorFramework client = get();
private static final String PATH = "/crud";
public static void main(String[] args) {
try {
client.start();
client.create().forPath(PATH, "I love messi".getBytes());
byte[] bs = client.getData().forPath(PATH);
System.out.println("新建的节点,data为:" + new String(bs));
// 注册观察者,当节点变动时触发
client.getData().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("node is changed");
}
}).inBackground().forPath(PATH);
client.setData().forPath(PATH, "I love football".getBytes());
// 注册观察者,当节点变动时触发
client.getData().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("node is changed");
}
}).inBackground().forPath(PATH);
client.setData().forPath(PATH, "I love foot2222ball".getBytes());
System.out.println(new String(client.getData().forPath(PATH)));
// 由于是在background模式下获取的data,此时的bs可能为null
byte[] bs2 = client.getData().watched().inBackground()
.forPath(PATH);
System.out.println("修改后的data为"
+ new String(bs2 != null ? bs2 : new byte[0]));
client.delete().forPath(PATH);
Stat stat = client.checkExists().forPath(PATH);
// Stat就是对zonde所有属性的一个映射, stat=null表示节点不存在!
System.out.println(stat);
} catch (Exception e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
}
public static CuratorFramework get() {
String zookeeperConnectionString = "127.0.0.1:4180,127.0.0.1:5180,127.0.0.1:6180";
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString(zookeeperConnectionString)
.sessionTimeoutMs(30000)
.connectionTimeoutMs(30000)
.canBeReadOnly(false)
.retryPolicy(
new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
.namespace("zk").defaultData(null).build();
return client;
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366389-1-1.html 上篇帖子: 跟着实例学习ZooKeeper的用法: 临时节点 下篇帖子: SolrCloud之分布式索引及与Zookeeper的集成
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表