设为首页 收藏本站
查看: 660|回复: 0

[经验分享] ZooKeeper示例 分布式锁思路及示例代码

[复制链接]

尚未签到

发表于 2015-11-22 08:30:19 | 显示全部楼层 |阅读模式
1.zookeeper实现分布式锁的作用是什么?
2.ZooKeeper分布式锁的思路是什么?
3.ZooKeeper锁不安全,该如何解决?
4.你是否有实现ZooKeeper锁的思路?
场景描述在分布式应用, 往往存在多个进程提供同一服务. 这些进程有可能在相同的机器上, 也有可能分布在不同的机器上. 如果这些进程共享了一些资源, 可能就需要分布式锁来锁定对这些资源的访问.
本文将介绍如何利用zookeeper实现分布式锁.思路进程需要访问共享数据时,就在"/locks"节点下创建一个sequence类型的子节点, 称为thisPath. 当thisPath在所有子节点中最小时, 说明该进程获得了锁. 进程获得锁之后, 就可以访问共享资源了. 访问完成后, 需要将thisPath删除. 锁由新的最小的子节点获得.
有了清晰的思路之后, 还需要补充一些细节. 进程如何知道thisPath是所有子节点中最小的呢? 可以在创建的时候, 通过getChildren方法获取子节点列表, 然后在列表中找到排名比thisPath前1位的节点, 称为waitPath, 然后在waitPath上注册监听, 当waitPath被删除后, 进程获得通知, 此时说明该进程获得了锁.实现以一个DistributedClient对象模拟一个进程的形式, 演示zookeeper分布式锁的实现.

  • public class DistributedClient {
  •     // 超时时间
  •     private static final int SESSION_TIMEOUT = 5000;
  •     // zookeeper server列表
  •     private String hosts = "localhost:4180,localhost:4181,localhost:4182";
  •     private String groupNode = "locks";
  •     private String subNode = "sub";

  •     private ZooKeeper zk;
  •     // 当前client创建的子节点
  •     private String thisPath;
  •     // 当前client等待的子节点
  •     private String waitPath;

  •     private CountDownLatch latch = new CountDownLatch(1);

  •     /**
  •      * 连接zookeeper
  •      */
  •     public void connectZookeeper() throws Exception {
  •         zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
  •             public void process(WatchedEvent event) {
  •                 try {
  •                     // 连接建立时, 打开latch, 唤醒wait在该latch上的线程
  •                     if (event.getState() == KeeperState.SyncConnected) {
  •                         latch.countDown();
  •                     }

  •                     // 发生了waitPath的删除事件
  •                     if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
  •                         doSomething();
  •                     }
  •                 } catch (Exception e) {
  •                     e.printStackTrace();
  •                 }
  •             }
  •         });

  •         // 等待连接建立
  •         latch.await();

  •         // 创建子节点
  •         thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
  •                 CreateMode.EPHEMERAL_SEQUENTIAL);

  •         // wait一小会, 让结果更清晰一些
  •         Thread.sleep(10);

  •         // 注意, 没有必要监听"/locks"的子节点的变化情况
  •         List<String> childrenNodes = zk.getChildren(&quot;/&quot; &#43; groupNode, false);

  •         // 列表中只有一个子节点, 那肯定就是thisPath, 说明client获得锁
  •         if (childrenNodes.size() == 1) {
  •             doSomething();
  •         } else {
  •             String thisNode = thisPath.substring((&quot;/&quot; &#43; groupNode &#43; &quot;/&quot;).length());
  •             // 排序
  •             Collections.sort(childrenNodes);
  •             int index = childrenNodes.indexOf(thisNode);
  •             if (index == -1) {
  •                 // never happened
  •             } else if (index == 0) {
  •                 // inddx == 0, 说明thisNode在列表中最小, 当前client获得锁
  •                 doSomething();
  •             } else {
  •                 // 获得排名比thisPath前1位的节点
  •                 this.waitPath = &quot;/&quot; &#43; groupNode &#43; &quot;/&quot; &#43; childrenNodes.get(index - 1);
  •                 // 在waitPath上注册监听器, 当waitPath被删除时, zookeeper会回调监听器的process方法
  •                 zk.getData(waitPath, true, new Stat());
  •             }
  •         }
  •     }

  •     private void doSomething() throws Exception {
  •         try {
  •             System.out.println(&quot;gain lock: &quot; &#43; thisPath);
  •             Thread.sleep(2000);
  •             // do something
  •         } finally {
  •             System.out.println(&quot;finished: &quot; &#43; thisPath);
  •             // 将thisPath删除, 监听thisPath的client将获得通知
  •             // 相当于释放锁
  •             zk.delete(this.thisPath, -1);
  •         }
  •     }

  •     public static void main(String[] args) throws Exception {
  •         for (int i = 0; i < 10; i&#43;&#43;) {
  •             new Thread() {
  •                 public void run() {
  •                     try {
  •                         DistributedClient dl = new DistributedClient();
  •                         dl.connectZookeeper();
  •                     } catch (Exception e) {
  •                         e.printStackTrace();
  •                     }
  •                 }
  •             }.start();
  •         }

  •         Thread.sleep(Long.MAX_VALUE);
  •     }
  • }
[color=#336699!important]复制代码
思考思维缜密的朋友可能会想到, 上述的方案并不安全. 假设某个client在获得锁之前挂掉了, 由于client创建的节点是ephemeral类型的, 因此这个节点也会被删除, 从而导致排在这个client之后的client提前获得了锁. 此时会存在多个client同时访问共享资源.
如何解决这个问题呢? 可以在接到waitPath的删除通知的时候, 进行一次确认, 确认当前的thisPath是否真的是列表中最小的节点.

  • // 发生了waitPath的删除事件
  • if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
  •         // 确认thisPath是否真的是列表中的最小节点
  •         List<String> childrenNodes = zk.getChildren(&quot;/&quot; &#43; groupNode, false);
  •         String thisNode = thisPath.substring((&quot;/&quot; &#43; groupNode &#43; &quot;/&quot;).length());
  •         // 排序
  •         Collections.sort(childrenNodes);
  •         int index = childrenNodes.indexOf(thisNode);
  •         if (index == 0) {
  •                 // 确实是最小节点
  •                 doSomething();
  •         } else {
  •                 // 说明waitPath是由于出现异常而挂掉的
  •                 // 更新waitPath
  •                 waitPath = &quot;/&quot; &#43; groupNode &#43; &quot;/&quot; &#43; childrenNodes.get(index - 1);
  •                 // 重新注册监听, 并判断此时waitPath是否已删除
  •                 if (zk.exists(waitPath, true) == null) {
  •                         doSomething();
  •                 }
  •         }
  • }
[color=#336699!important]复制代码
另外, 由于thisPath和waitPath这2个成员变量会在多个线程中访问, 最好将他们声明为volatile, 以防止出现线程可见性问题.另一种思路下面介绍一种更简单, 但是不怎么推荐的解决方案.
每个client在getChildren的时候, 注册监听子节点的变化. 当子节点的变化通知到来时, 再一次通过getChildren获取子节点列表, 判断thisPath是否是列表中的最小节点, 如果是, 则执行资源访问逻辑.

  • public class DistributedClient2 {
  •         // 超时时间
  •         private static final int SESSION_TIMEOUT = 5000;
  •         // zookeeper server列表
  •         private String hosts = &quot;localhost:4180,localhost:4181,localhost:4182&quot;;
  •         private String groupNode = &quot;locks&quot;;
  •         private String subNode = &quot;sub&quot;;

  •         private ZooKeeper zk;
  •         // 当前client创建的子节点
  •         private volatile String thisPath;

  •         private CountDownLatch latch = new CountDownLatch(1);

  •         /**
  •          * 连接zookeeper
  •          */
  •         public void connectZookeeper() throws Exception {
  •                 zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
  •                         public void process(WatchedEvent event) {
  •                                 try {
  •                                         // 连接建立时, 打开latch, 唤醒wait在该latch上的线程
  •                                         if (event.getState() == KeeperState.SyncConnected) {
  •                                                 latch.countDown();
  •                                         }

  •                                         // 子节点发生变化
  •                                         if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals(&quot;/&quot; &#43; groupNode)) {
  •                                                 // thisPath是否是列表中的最小节点
  •                                                 List<String> childrenNodes = zk.getChildren(&quot;/&quot; &#43; groupNode, true);
  •                                                 String thisNode = thisPath.substring((&quot;/&quot; &#43; groupNode &#43; &quot;/&quot;).length());
  •                                                 // 排序
  •                                                 Collections.sort(childrenNodes);
  •                                                 if (childrenNodes.indexOf(thisNode) == 0) {
  •                                                         doSomething();
  •                                                 }
  •                                         }
  •                                 } catch (Exception e) {
  •                                         e.printStackTrace();
  •                                 }
  •                         }
  •                 });

  •                 // 等待连接建立
  •                 latch.await();

  •                 // 创建子节点
  •                 thisPath = zk.create(&quot;/&quot; &#43; groupNode &#43; &quot;/&quot; &#43; subNode, null, Ids.OPEN_ACL_UNSAFE,
  •                                 CreateMode.EPHEMERAL_SEQUENTIAL);

  •                 // wait一小会, 让结果更清晰一些
  •                 Thread.sleep(10);

  •                 // 监听子节点的变化
  •                 List<String> childrenNodes = zk.getChildren(&quot;/&quot; &#43; groupNode, true);

  •                 // 列表中只有一个子节点, 那肯定就是thisPath, 说明client获得锁
  •                 if (childrenNodes.size() == 1) {
  •                         doSomething();
  •                 }
  •         }

  •         /**
  •          * 共享资源的访问逻辑写在这个方法中
  •          */
  •         private void doSomething() throws Exception {
  •                 try {
  •                         System.out.println(&quot;gain lock: &quot; &#43; thisPath);
  •                         Thread.sleep(2000);
  •                         // do something
  •                 } finally {
  •                         System.out.println(&quot;finished: &quot; &#43; thisPath);
  •                         // 将thisPath删除, 监听thisPath的client将获得通知
  •                         // 相当于释放锁
  •                         zk.delete(this.thisPath, -1);
  •                 }
  •         }

  •         public static void main(String[] args) throws Exception {
  •                 for (int i = 0; i < 10; i&#43;&#43;) {
  •                         new Thread() {
  •                                 public void run() {
  •                                         try {
  •                                                 DistributedClient2 dl = new DistributedClient2();
  •                                                 dl.connectZookeeper();
  •                                         } catch (Exception e) {
  •                                                 e.printStackTrace();
  •                                         }
  •                                 }
  •                         }.start();
  •                 }

  •                 Thread.sleep(Long.MAX_VALUE);
  •         }
  • }
[color=#336699!important]复制代码
为什么不推荐这个方案呢? 是因为每次子节点的增加和删除都要广播给所有client,client数量不多时还看不出问题. 如果存在很多client, 那么就可能导致广播风暴--过多的广播通知阻塞了网络. 使用第一个方案, 会使得通知的数量大大下降. 当然第一个方案更复杂一些, 复杂的方案同时也意味着更容易引进bug.

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-141979-1-1.html 上篇帖子: Java实现与ZooKeeper的连接 下篇帖子: Zookeeper集群环境搭建实践
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表