设为首页 收藏本站
查看: 1049|回复: 0

[经验分享] 基于zookeeper的分布式锁实现

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2016-8-12 09:11:30 | 显示全部楼层 |阅读模式
工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现

准备工作
有几个帮助类,先把代码放上来
ZKClient 对zk的操作做了一个简单的封装

Java代码  
ZKUtil 针对zk路径的一个工具类
Java代码  
NetworkUtil 获取本机IP的工具方法
Java代码  

--------------------------- 正文开始  -----------------------------------
这种实现非常简单,具体的流程如下

e6a72aca-1ee6-3431-a299-340fbce71410.jpg
对应的实现如下
Java代码  

总结
网上有很多文章,大家的方法大多数都是创建一个root根节点,每一个trylock的客户端都会在root下创建一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变更watcher(为了避免羊群效应,可以只添加前一个节点的变更通知) .如果创建的节点的序号是最小,则获取到锁,否则继续等待root的child 变更
  • package zk.lock;  


  • import zk.util.NetworkUtil;  
  • import zk.util.ZKUtil;  

  • /**
  • * User: zhenghui
  • * Date: 14-3-26
  • * Time: 下午8:37
  • * 分布式锁实现.
  • *
  • * 这种实现的原理是,创建某一个任务的节点,比如 /lock/tasckname 然后获取对应的值,如果是当前的Ip,那么获得锁,如果不是,则没获得
  • * .如果该节点不存在,则创建该节点,并把改节点的值设置成当前的IP
  • */  
  • public class DistributedLock01 {  

  •     private ZKClient zkClient;  


  •     public static final String LOCK_ROOT = "/lock";  
  •     private String lockName;  


  •     public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {  
  •         //先创建zk链接.  
  •         this.createConnection(connectString,sessionTimeout);  

  •         this.lockName = lockName;  
  •     }  

  •     public boolean tryLock(){  
  •         String path = ZKUtil.contact(LOCK_ROOT,lockName);  
  •         String localIp = NetworkUtil.getNetworkAddress();  
  •         try {  
  •             if(zkClient.exists(path)){  
  •                 String ownnerIp = zkClient.getData(path);  
  •                 if(localIp.equals(ownnerIp)){  
  •                     return true;  
  •                 }  
  •             } else {  
  •                 zkClient.createPathIfAbsent(path,false);  
  •                 if(zkClient.exists(path)){  
  •                     String ownnerIp = zkClient.getData(path);  
  •                     if(localIp.equals(ownnerIp)){  
  •                         return true;  
  •                     }  
  •                 }  
  •             }  
  •         } catch (Exception e) {  
  •             e.printStackTrace();  
  •         }  
  •         return false;  
  •     }  


  •     /**
  •      * 创建zk连接
  •      *
  •      */  
  •     protected void createConnection(String connectString, int sessionTimeout) throws Exception {  
  •         if(zkClient != null){  
  •             releaseConnection();  
  •         }  
  •         zkClient = new ZKClient(connectString,sessionTimeout);  
  •         zkClient.createPathIfAbsent(LOCK_ROOT,true);  
  •     }  
  •     /**
  •      * 关闭ZK连接
  •      */  
  •     protected void releaseConnection() throws InterruptedException {  
  •         if (zkClient != null) {  
  •             zkClient.close();  
  •         }  
  •     }  

  • }  

  • package zk.util;  

  • import java.net.InetAddress;  
  • import java.net.NetworkInterface;  
  • import java.util.Enumeration;  

  • /**
  • * User: zhenghui
  • * Date: 14-4-1
  • * Time: 下午4:47
  • */  
  • public class NetworkUtil {  

  •     static private final char COLON = ':';  

  •     /**
  •      * 获取当前机器ip地址
  •      * 据说多网卡的时候会有问题.
  •      */  
  •     public static String getNetworkAddress() {  
  •         Enumeration<NetworkInterface> netInterfaces;  
  •         try {  
  •             netInterfaces = NetworkInterface.getNetworkInterfaces();  
  •             InetAddress ip;  
  •             while (netInterfaces.hasMoreElements()) {  
  •                 NetworkInterface ni = netInterfaces  
  •                         .nextElement();  
  •                 Enumeration<InetAddress> addresses=ni.getInetAddresses();  
  •                 while(addresses.hasMoreElements()){  
  •                     ip = addresses.nextElement();  
  •                     if (!ip.isLoopbackAddress()  
  •                             && ip.getHostAddress().indexOf(COLON) == -1) {  
  •                         return ip.getHostAddress();  
  •                     }  
  •                 }  
  •             }  
  •             return "";  
  •         } catch (Exception e) {  
  •             return "";  
  •         }  
  •     }  
  • }  

  • package zk.util;  

  • /**
  • * User: zhenghui
  • * Date: 14-3-26
  • * Time: 下午9:56
  • */  
  • public class ZKUtil {  

  •     public static final String SEPARATOR = "/";  

  •     /**
  •      * 转换path为zk的标准路径 以/开头,最后不带/
  •      */  
  •     public static String normalize(String path) {  
  •         String temp = path;  
  •         if(!path.startsWith(SEPARATOR)) {  
  •             temp = SEPARATOR + path;  
  •         }  
  •         if(path.endsWith(SEPARATOR)) {  
  •             temp = temp.substring(0, temp.length()-1);  
  •             return normalize(temp);  
  •         }else {  
  •             return temp;  
  •         }  
  •     }  

  •     /**
  •      * 链接两个path,并转化为zk的标准路径
  •      */  
  •     public static String contact(String path1,String path2){  
  •         if(path2.startsWith(SEPARATOR)) {  
  •             path2 = path2.substring(1);  
  •         }  
  •         if(path1.endsWith(SEPARATOR)) {  
  •             return normalize(path1 + path2);  
  •         } else {  
  •             return normalize(path1 + SEPARATOR + path2);  
  •         }  
  •     }  

  •     /**
  •      * 字符串转化成byte类型
  •      */  
  •     public static byte[] toBytes(String data) {  
  •         if(data == null || data.trim().equals("")) return null;  
  •         return data.getBytes();  
  •     }  
  • }  

  • package zk.lock;  

  • import org.apache.zookeeper.*;  
  • import org.apache.zookeeper.data.Stat;  
  • import zk.util.ZKUtil;  

  • import java.util.concurrent.CountDownLatch;  
  • import java.util.concurrent.TimeUnit;  

  • /**
  • * User: zhenghui
  • * Date: 14-3-26
  • * Time: 下午8:50
  • * 封装一个zookeeper实例.
  • */  
  • public class ZKClient implements Watcher {  

  •     private ZooKeeper zookeeper;  

  •     private CountDownLatch connectedSemaphore = new CountDownLatch(1);  


  •     public ZKClient(String connectString, int sessionTimeout) throws Exception {  
  •         zookeeper = new ZooKeeper(connectString, sessionTimeout, this);  
  •         System.out.println("connecting zk server");  
  •         if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {  
  •             System.out.println("connect zk server success");  
  •         } else {  
  •             System.out.println("connect zk server error.");  
  •             throw new Exception("connect zk server error.");  
  •         }  
  •     }  

  •     public void close() throws InterruptedException {  
  •         if (zookeeper != null) {  
  •             zookeeper.close();  
  •         }  
  •     }  

  •     public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {  
  •         CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;  
  •         path = ZKUtil.normalize(path);  
  •         if (!this.exists(path)) {  
  •             zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);  
  •         }  
  •     }  

  •     public boolean exists(String path) throws Exception {  
  •         path = ZKUtil.normalize(path);  
  •         Stat stat = zookeeper.exists(path, null);  
  •         return stat != null;  
  •     }  

  •     public String getData(String path) throws Exception {  
  •         path = ZKUtil.normalize(path);  
  •         try {  
  •             byte[] data = zookeeper.getData(path, null, null);  
  •             return new String(data);  
  •         } catch (KeeperException e) {  
  •             if (e instanceof KeeperException.NoNodeException) {  
  •                 throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);  
  •             } else {  
  •                 throw new Exception(e);  
  •             }  
  •         } catch (InterruptedException e) {  
  •             Thread.currentThread().interrupt();  
  •             throw new Exception(e);  
  •         }  
  •     }  

  •     @Override  
  •     public void process(WatchedEvent event) {  
  •         if (event == null) return;  

  •         // 连接状态  
  •         Watcher.Event.KeeperState keeperState = event.getState();  
  •         // 事件类型  
  •         Watcher.Event.EventType eventType = event.getType();  
  •         // 受影响的path  
  • //        String path = event.getPath();  
  •         if (Watcher.Event.KeeperState.SyncConnected == keeperState) {  
  •             // 成功连接上ZK服务器  
  •             if (Watcher.Event.EventType.None == eventType) {  
  •                 System.out.println("zookeeper connect success");  
  •                 connectedSemaphore.countDown();  
  •             }  
  •         }  
  •         //下面可以做一些重连的工作.  
  •         else if (Watcher.Event.KeeperState.Disconnected == keeperState) {  
  •             System.out.println("zookeeper Disconnected");  
  •         } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {  
  •             System.out.println("zookeeper AuthFailed");  
  •         } else if (Watcher.Event.KeeperState.Expired == keeperState) {  
  •             System.out.println("zookeeper Expired");  
  •         }  
  •     }  
  • }  

核心技术:Maven,Springmvc mybatis shiro, Druid, Restful, Dubbo, ZooKeeper,Redis,FastDFS,ActiveMQ,Nginx 1.     项目核心代码结构截图   项目模块依赖特别提醒:开发人员在开发的时候可以将自己的业务REST服务化或者Dubbo服务化2.    项目依赖介绍   2.1 后台管理系统、Rest服务系统、Scheculer定时调度系统依赖如下图:        2.2 Dubbo独立服务项目依赖如下图: 3.  项目功能部分截图:      zookeeper、dubbo服务启动  dubbo管控台        REST服务平台   


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-256731-1-1.html 上篇帖子: zookeeper意外关闭后启动不了 下篇帖子: zookeeper+redies 系统序列号生成
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表