设为首页 收藏本站
查看: 906|回复: 0

[经验分享] zookeeper 共享锁代码的实现

[复制链接]

尚未签到

发表于 2017-4-19 10:18:33 | 显示全部楼层 |阅读模式
zookeeper存储是树状结构,如/path1/path2(这里path1必须是PERSISTENT,否则path2不能创建),
节点有三种状态:
1、EPHEMERAL临时节点,server删除后该节点也会删除,但是我测试的时候,如果将server强制删除,EPHEMERAL节点会延迟删除;
2、PERSISTENT节点,这个是持久保存的节点。
3、SEQUENTIAL节点,自动标号的节点
如果父节点有子节点,父节点将不能删除。


watch是zookeeper最核心的一个功能;
watch = true;
监听某path1节点目录下是否创建删除子节点
zookeeper.getChildren("/path1", watch );
监听path2是否删除、修改
zookeeper.exists(path2,true);
zookeeper.getData(path2,true, null);




zookeeper能很好的协调分布式应用的任务执行,比如当多个应用同时处理分布式存储内存里面KEY VALUE、MQ时。锁只是其中一种表现。还有zookeeper 选举,集群管理,队列管理等;



public abstract class ZkLock implements Watcher {
private ZooKeeper zk;
private String hosts = "192.168.1.202:2181";
private String lockName = null;
private String nodePath = null;
private String nodeName = null;
private boolean extra = false;
private Integer SESSION_TIMEOUT = 5000;
private String waitPath = null;
public CountDownLatch latch = new CountDownLatch(1);

public ZkLock(String lockName,ZooKeeper zk) throws Exception{
this.lockName = StringUtils.contains(lockName, "/")?lockName:"/"+lockName;
if(zk==null){
  zk = new ZooKeeper(hosts, SESSION_TIMEOUT,this);
}
this.zk = zk;
if(zk.exists(this.lockName, false)==null){
zk.create(this.lockName, this.lockName.getBytes()
, Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
nodePath = zk.create(this.lockName + "/lock-", new byte[0],Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
nodeName = StringUtils.substringAfter(nodePath, this.lockName+"/");
}

public ZkLock(String lockName) throws Exception{
this(lockName,null);
}
@Override
public void process(WatchedEvent event) {
latch.countDown();
if (extra&&event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
action();
        }
}

private Object exec() throws Exception{
List<String> list = zk.getChildren(lockName, false);
String[] nodes = list.toArray(new String[list.size()]);
Arrays.sort(nodes);
if(StringUtils.equals(nodeName, nodes[0])){
Object obj = action();
return obj;
}else{
return waitLock(nodes[0]);
}
}

private Object waitLock(String lower) throws Exception{
Stat stat = zk.exists(lockName + "/" + lower,true);
if(stat!=null){
latch.await();
}
return this.exec();
}

//这里的锁,主线程后面要加Thread.sleep()
private void extraExec() throws Exception{
        latch.await();
        List<String> childrenNodes = zk.getChildren(lockName, false);
        if (childrenNodes.size() == 1) {
        action();
        } else {
          Collections.sort(childrenNodes);
              int index = childrenNodes.indexOf(nodeName);
            if (index == -1) {
                // never happened
            } else if (index == 0) {
            action();
            } else {
//            System.out.println("waitPath:"+childrenNodes.get(index - 1));
            this.waitPath = this.lockName + "/" + childrenNodes.get(index - 1);
                // 在waitPath上注册监听器, 当waitPath被删除时, zookeeper会回调监听器的process方法
             if(zk.exists(waitPath, true)==null){
            action();
                 }
            }
        }
}

private Object action() {
try{
return doAction();
}finally{
try {
zk.delete(nodePath, -1);
//zk.close();
} catch (Exception e) {
throw new RuntimeException(e);
}

}
}

/**
* 主程必须守候,因为是与脱离主线程,所以无返回值。
* @return
* @throws Exception
*/
public void extraExecute() throws Exception{
this.extra = true;
this.extraExec();
}

public Object execute() throws Exception{
return exec();
}

public abstract Object doAction() ;


public static void main(String[] args) throws Exception {
Object obj = null;
final CountDownLatch startSignal = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {//zookeeper默认是一个应用10个连接
            new Thread() {
                public void run() {
                ZkLock lock;
try {
lock = new ZkLock("test") {
@Override
public Object doAction() {
return null;
}
};
//startSignal.await();
lock.execute();
                                              //  lock.extraExecute();
                                      //Thread.sleep(Long.MAX_VALUE);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
                }
            }.start();
            startSignal.countDown();
        }
}

}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366246-1-1.html 上篇帖子: 记zookeeper 扰动导致HBase的一次不可用 下篇帖子: 利用zookeeper统计管理配置文件
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表