uykh 发表于 2016-8-12 09:11:30

基于zookeeper的分布式锁实现

工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现
准备工作有几个帮助类,先把代码放上来ZKClient 对zk的操作做了一个简单的封装
Java代码
ZKUtil 针对zk路径的一个工具类Java代码
NetworkUtil 获取本机IP的工具方法Java代码

--------------------------- 正文开始-----------------------------------这种实现非常简单,具体的流程如下

对应的实现如下Java代码

总结网上有很多文章,大家的方法大多数都是创建一个root根节点,每一个trylock的客户端都会在root下创建一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变更watcher(为了避免羊群效应,可以只添加前一个节点的变更通知) .如果创建的节点的序号是最小,则获取到锁,否则继续等待root的child 变更
[*]package zk.lock;

[*]

[*]

[*]import zk.util.NetworkUtil;

[*]import zk.util.ZKUtil;

[*]

[*]/**

[*] * User: zhenghui

[*] * Date: 14-3-26

[*] * Time: 下午8:37

[*] * 分布式锁实现.

[*] *

[*] * 这种实现的原理是,创建某一个任务的节点,比如 /lock/tasckname 然后获取对应的值,如果是当前的Ip,那么获得锁,如果不是,则没获得

[*] * .如果该节点不存在,则创建该节点,并把改节点的值设置成当前的IP

[*] */

[*]public class DistributedLock01 {

[*]

[*]    private ZKClient zkClient;

[*]

[*]

[*]    public static final String LOCK_ROOT = "/lock";

[*]    private String lockName;

[*]

[*]

[*]    public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {

[*]      //先创建zk链接.

[*]      this.createConnection(connectString,sessionTimeout);

[*]

[*]      this.lockName = lockName;

[*]    }

[*]

[*]    public boolean tryLock(){

[*]      String path = ZKUtil.contact(LOCK_ROOT,lockName);

[*]      String localIp = NetworkUtil.getNetworkAddress();

[*]      try {

[*]            if(zkClient.exists(path)){

[*]                String ownnerIp = zkClient.getData(path);

[*]                if(localIp.equals(ownnerIp)){

[*]                  return true;

[*]                }

[*]            } else {

[*]                zkClient.createPathIfAbsent(path,false);

[*]                if(zkClient.exists(path)){

[*]                  String ownnerIp = zkClient.getData(path);

[*]                  if(localIp.equals(ownnerIp)){

[*]                        return true;

[*]                  }

[*]                }

[*]            }

[*]      } catch (Exception e) {

[*]            e.printStackTrace();

[*]      }

[*]      return false;

[*]    }

[*]

[*]

[*]    /**

[*]   * 创建zk连接

[*]   *

[*]   */

[*]    protected void createConnection(String connectString, int sessionTimeout) throws Exception {

[*]      if(zkClient != null){

[*]            releaseConnection();

[*]      }

[*]      zkClient = new ZKClient(connectString,sessionTimeout);

[*]      zkClient.createPathIfAbsent(LOCK_ROOT,true);

[*]    }

[*]    /**

[*]   * 关闭ZK连接

[*]   */

[*]    protected void releaseConnection() throws InterruptedException {

[*]      if (zkClient != null) {

[*]            zkClient.close();

[*]      }

[*]    }

[*]

[*]}


[*]package zk.util;

[*]

[*]import java.net.InetAddress;

[*]import java.net.NetworkInterface;

[*]import java.util.Enumeration;

[*]

[*]/**

[*] * User: zhenghui

[*] * Date: 14-4-1

[*] * Time: 下午4:47

[*] */

[*]public class NetworkUtil {

[*]

[*]    static private final char COLON = ':';

[*]

[*]    /**

[*]   * 获取当前机器ip地址

[*]   * 据说多网卡的时候会有问题.

[*]   */

[*]    public static String getNetworkAddress() {

[*]      Enumeration<NetworkInterface> netInterfaces;

[*]      try {

[*]            netInterfaces = NetworkInterface.getNetworkInterfaces();

[*]            InetAddress ip;

[*]            while (netInterfaces.hasMoreElements()) {

[*]                NetworkInterface ni = netInterfaces

[*]                        .nextElement();

[*]                Enumeration<InetAddress> addresses=ni.getInetAddresses();

[*]                while(addresses.hasMoreElements()){

[*]                  ip = addresses.nextElement();

[*]                  if (!ip.isLoopbackAddress()

[*]                            && ip.getHostAddress().indexOf(COLON) == -1) {

[*]                        return ip.getHostAddress();

[*]                  }

[*]                }

[*]            }

[*]            return "";

[*]      } catch (Exception e) {

[*]            return "";

[*]      }

[*]    }

[*]}


[*]package zk.util;

[*]

[*]/**

[*] * User: zhenghui

[*] * Date: 14-3-26

[*] * Time: 下午9:56

[*] */

[*]public class ZKUtil {

[*]

[*]    public static final String SEPARATOR = "/";

[*]

[*]    /**

[*]   * 转换path为zk的标准路径 以/开头,最后不带/

[*]   */

[*]    public static String normalize(String path) {

[*]      String temp = path;

[*]      if(!path.startsWith(SEPARATOR)) {

[*]            temp = SEPARATOR + path;

[*]      }

[*]      if(path.endsWith(SEPARATOR)) {

[*]            temp = temp.substring(0, temp.length()-1);

[*]            return normalize(temp);

[*]      }else {

[*]            return temp;

[*]      }

[*]    }

[*]

[*]    /**

[*]   * 链接两个path,并转化为zk的标准路径

[*]   */

[*]    public static String contact(String path1,String path2){

[*]      if(path2.startsWith(SEPARATOR)) {

[*]            path2 = path2.substring(1);

[*]      }

[*]      if(path1.endsWith(SEPARATOR)) {

[*]            return normalize(path1 + path2);

[*]      } else {

[*]            return normalize(path1 + SEPARATOR + path2);

[*]      }

[*]    }

[*]

[*]    /**

[*]   * 字符串转化成byte类型

[*]   */

[*]    public static byte[] toBytes(String data) {

[*]      if(data == null || data.trim().equals("")) return null;

[*]      return data.getBytes();

[*]    }

[*]}


[*]package zk.lock;

[*]

[*]import org.apache.zookeeper.*;

[*]import org.apache.zookeeper.data.Stat;

[*]import zk.util.ZKUtil;

[*]

[*]import java.util.concurrent.CountDownLatch;

[*]import java.util.concurrent.TimeUnit;

[*]

[*]/**

[*] * User: zhenghui

[*] * Date: 14-3-26

[*] * Time: 下午8:50

[*] * 封装一个zookeeper实例.

[*] */

[*]public class ZKClient implements Watcher {

[*]

[*]    private ZooKeeper zookeeper;

[*]

[*]    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

[*]

[*]

[*]    public ZKClient(String connectString, int sessionTimeout) throws Exception {

[*]      zookeeper = new ZooKeeper(connectString, sessionTimeout, this);

[*]      System.out.println("connecting zk server");

[*]      if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {

[*]            System.out.println("connect zk server success");

[*]      } else {

[*]            System.out.println("connect zk server error.");

[*]            throw new Exception("connect zk server error.");

[*]      }

[*]    }

[*]

[*]    public void close() throws InterruptedException {

[*]      if (zookeeper != null) {

[*]            zookeeper.close();

[*]      }

[*]    }

[*]

[*]    public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {

[*]      CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;

[*]      path = ZKUtil.normalize(path);

[*]      if (!this.exists(path)) {

[*]            zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);

[*]      }

[*]    }

[*]

[*]    public boolean exists(String path) throws Exception {

[*]      path = ZKUtil.normalize(path);

[*]      Stat stat = zookeeper.exists(path, null);

[*]      return stat != null;

[*]    }

[*]

[*]    public String getData(String path) throws Exception {

[*]      path = ZKUtil.normalize(path);

[*]      try {

[*]            byte[] data = zookeeper.getData(path, null, null);

[*]            return new String(data);

[*]      } catch (KeeperException e) {

[*]            if (e instanceof KeeperException.NoNodeException) {

[*]                throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);

[*]            } else {

[*]                throw new Exception(e);

[*]            }

[*]      } catch (InterruptedException e) {

[*]            Thread.currentThread().interrupt();

[*]            throw new Exception(e);

[*]      }

[*]    }

[*]

[*]    @Override

[*]    public void process(WatchedEvent event) {

[*]      if (event == null) return;

[*]

[*]      // 连接状态

[*]      Watcher.Event.KeeperState keeperState = event.getState();

[*]      // 事件类型

[*]      Watcher.Event.EventType eventType = event.getType();

[*]      // 受影响的path

[*]//      String path = event.getPath();

[*]      if (Watcher.Event.KeeperState.SyncConnected == keeperState) {

[*]            // 成功连接上ZK服务器

[*]            if (Watcher.Event.EventType.None == eventType) {

[*]                System.out.println("zookeeper connect success");

[*]                connectedSemaphore.countDown();

[*]            }

[*]      }

[*]      //下面可以做一些重连的工作.

[*]      else if (Watcher.Event.KeeperState.Disconnected == keeperState) {

[*]            System.out.println("zookeeper Disconnected");

[*]      } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {

[*]            System.out.println("zookeeper AuthFailed");

[*]      } else if (Watcher.Event.KeeperState.Expired == keeperState) {

[*]            System.out.println("zookeeper Expired");

[*]      }

[*]    }

[*]}

核心技术:Maven,Springmvc mybatis shiro, Druid, Restful, Dubbo, ZooKeeper,Redis,FastDFS,ActiveMQ,Nginx 1.   项目核心代码结构截图   项目模块依赖特别提醒:开发人员在开发的时候可以将自己的业务REST服务化或者Dubbo服务化2.    项目依赖介绍   2.1 后台管理系统、Rest服务系统、Scheculer定时调度系统依赖如下图:      2.2 Dubbo独立服务项目依赖如下图: 3.项目功能部分截图:      zookeeper、dubbo服务启动dubbo管控台      REST服务平台   

页: [1]
查看完整版本: 基于zookeeper的分布式锁实现