基于zookeeper的分布式锁实现
工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现准备工作有几个帮助类,先把代码放上来ZKClient 对zk的操作做了一个简单的封装
Java代码
ZKUtil 针对zk路径的一个工具类Java代码
NetworkUtil 获取本机IP的工具方法Java代码
--------------------------- 正文开始-----------------------------------这种实现非常简单,具体的流程如下
对应的实现如下Java代码
总结网上有很多文章,大家的方法大多数都是创建一个root根节点,每一个trylock的客户端都会在root下创建一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变更watcher(为了避免羊群效应,可以只添加前一个节点的变更通知) .如果创建的节点的序号是最小,则获取到锁,否则继续等待root的child 变更
[*]package zk.lock;
[*]
[*]
[*]import zk.util.NetworkUtil;
[*]import zk.util.ZKUtil;
[*]
[*]/**
[*] * User: zhenghui
[*] * Date: 14-3-26
[*] * Time: 下午8:37
[*] * 分布式锁实现.
[*] *
[*] * 这种实现的原理是,创建某一个任务的节点,比如 /lock/tasckname 然后获取对应的值,如果是当前的Ip,那么获得锁,如果不是,则没获得
[*] * .如果该节点不存在,则创建该节点,并把改节点的值设置成当前的IP
[*] */
[*]public class DistributedLock01 {
[*]
[*] private ZKClient zkClient;
[*]
[*]
[*] public static final String LOCK_ROOT = "/lock";
[*] private String lockName;
[*]
[*]
[*] public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {
[*] //先创建zk链接.
[*] this.createConnection(connectString,sessionTimeout);
[*]
[*] this.lockName = lockName;
[*] }
[*]
[*] public boolean tryLock(){
[*] String path = ZKUtil.contact(LOCK_ROOT,lockName);
[*] String localIp = NetworkUtil.getNetworkAddress();
[*] try {
[*] if(zkClient.exists(path)){
[*] String ownnerIp = zkClient.getData(path);
[*] if(localIp.equals(ownnerIp)){
[*] return true;
[*] }
[*] } else {
[*] zkClient.createPathIfAbsent(path,false);
[*] if(zkClient.exists(path)){
[*] String ownnerIp = zkClient.getData(path);
[*] if(localIp.equals(ownnerIp)){
[*] return true;
[*] }
[*] }
[*] }
[*] } catch (Exception e) {
[*] e.printStackTrace();
[*] }
[*] return false;
[*] }
[*]
[*]
[*] /**
[*] * 创建zk连接
[*] *
[*] */
[*] protected void createConnection(String connectString, int sessionTimeout) throws Exception {
[*] if(zkClient != null){
[*] releaseConnection();
[*] }
[*] zkClient = new ZKClient(connectString,sessionTimeout);
[*] zkClient.createPathIfAbsent(LOCK_ROOT,true);
[*] }
[*] /**
[*] * 关闭ZK连接
[*] */
[*] protected void releaseConnection() throws InterruptedException {
[*] if (zkClient != null) {
[*] zkClient.close();
[*] }
[*] }
[*]
[*]}
[*]package zk.util;
[*]
[*]import java.net.InetAddress;
[*]import java.net.NetworkInterface;
[*]import java.util.Enumeration;
[*]
[*]/**
[*] * User: zhenghui
[*] * Date: 14-4-1
[*] * Time: 下午4:47
[*] */
[*]public class NetworkUtil {
[*]
[*] static private final char COLON = ':';
[*]
[*] /**
[*] * 获取当前机器ip地址
[*] * 据说多网卡的时候会有问题.
[*] */
[*] public static String getNetworkAddress() {
[*] Enumeration<NetworkInterface> netInterfaces;
[*] try {
[*] netInterfaces = NetworkInterface.getNetworkInterfaces();
[*] InetAddress ip;
[*] while (netInterfaces.hasMoreElements()) {
[*] NetworkInterface ni = netInterfaces
[*] .nextElement();
[*] Enumeration<InetAddress> addresses=ni.getInetAddresses();
[*] while(addresses.hasMoreElements()){
[*] ip = addresses.nextElement();
[*] if (!ip.isLoopbackAddress()
[*] && ip.getHostAddress().indexOf(COLON) == -1) {
[*] return ip.getHostAddress();
[*] }
[*] }
[*] }
[*] return "";
[*] } catch (Exception e) {
[*] return "";
[*] }
[*] }
[*]}
[*]package zk.util;
[*]
[*]/**
[*] * User: zhenghui
[*] * Date: 14-3-26
[*] * Time: 下午9:56
[*] */
[*]public class ZKUtil {
[*]
[*] public static final String SEPARATOR = "/";
[*]
[*] /**
[*] * 转换path为zk的标准路径 以/开头,最后不带/
[*] */
[*] public static String normalize(String path) {
[*] String temp = path;
[*] if(!path.startsWith(SEPARATOR)) {
[*] temp = SEPARATOR + path;
[*] }
[*] if(path.endsWith(SEPARATOR)) {
[*] temp = temp.substring(0, temp.length()-1);
[*] return normalize(temp);
[*] }else {
[*] return temp;
[*] }
[*] }
[*]
[*] /**
[*] * 链接两个path,并转化为zk的标准路径
[*] */
[*] public static String contact(String path1,String path2){
[*] if(path2.startsWith(SEPARATOR)) {
[*] path2 = path2.substring(1);
[*] }
[*] if(path1.endsWith(SEPARATOR)) {
[*] return normalize(path1 + path2);
[*] } else {
[*] return normalize(path1 + SEPARATOR + path2);
[*] }
[*] }
[*]
[*] /**
[*] * 字符串转化成byte类型
[*] */
[*] public static byte[] toBytes(String data) {
[*] if(data == null || data.trim().equals("")) return null;
[*] return data.getBytes();
[*] }
[*]}
[*]package zk.lock;
[*]
[*]import org.apache.zookeeper.*;
[*]import org.apache.zookeeper.data.Stat;
[*]import zk.util.ZKUtil;
[*]
[*]import java.util.concurrent.CountDownLatch;
[*]import java.util.concurrent.TimeUnit;
[*]
[*]/**
[*] * User: zhenghui
[*] * Date: 14-3-26
[*] * Time: 下午8:50
[*] * 封装一个zookeeper实例.
[*] */
[*]public class ZKClient implements Watcher {
[*]
[*] private ZooKeeper zookeeper;
[*]
[*] private CountDownLatch connectedSemaphore = new CountDownLatch(1);
[*]
[*]
[*] public ZKClient(String connectString, int sessionTimeout) throws Exception {
[*] zookeeper = new ZooKeeper(connectString, sessionTimeout, this);
[*] System.out.println("connecting zk server");
[*] if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {
[*] System.out.println("connect zk server success");
[*] } else {
[*] System.out.println("connect zk server error.");
[*] throw new Exception("connect zk server error.");
[*] }
[*] }
[*]
[*] public void close() throws InterruptedException {
[*] if (zookeeper != null) {
[*] zookeeper.close();
[*] }
[*] }
[*]
[*] public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {
[*] CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
[*] path = ZKUtil.normalize(path);
[*] if (!this.exists(path)) {
[*] zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
[*] }
[*] }
[*]
[*] public boolean exists(String path) throws Exception {
[*] path = ZKUtil.normalize(path);
[*] Stat stat = zookeeper.exists(path, null);
[*] return stat != null;
[*] }
[*]
[*] public String getData(String path) throws Exception {
[*] path = ZKUtil.normalize(path);
[*] try {
[*] byte[] data = zookeeper.getData(path, null, null);
[*] return new String(data);
[*] } catch (KeeperException e) {
[*] if (e instanceof KeeperException.NoNodeException) {
[*] throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);
[*] } else {
[*] throw new Exception(e);
[*] }
[*] } catch (InterruptedException e) {
[*] Thread.currentThread().interrupt();
[*] throw new Exception(e);
[*] }
[*] }
[*]
[*] @Override
[*] public void process(WatchedEvent event) {
[*] if (event == null) return;
[*]
[*] // 连接状态
[*] Watcher.Event.KeeperState keeperState = event.getState();
[*] // 事件类型
[*] Watcher.Event.EventType eventType = event.getType();
[*] // 受影响的path
[*]// String path = event.getPath();
[*] if (Watcher.Event.KeeperState.SyncConnected == keeperState) {
[*] // 成功连接上ZK服务器
[*] if (Watcher.Event.EventType.None == eventType) {
[*] System.out.println("zookeeper connect success");
[*] connectedSemaphore.countDown();
[*] }
[*] }
[*] //下面可以做一些重连的工作.
[*] else if (Watcher.Event.KeeperState.Disconnected == keeperState) {
[*] System.out.println("zookeeper Disconnected");
[*] } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {
[*] System.out.println("zookeeper AuthFailed");
[*] } else if (Watcher.Event.KeeperState.Expired == keeperState) {
[*] System.out.println("zookeeper Expired");
[*] }
[*] }
[*]}
核心技术:Maven,Springmvc mybatis shiro, Druid, Restful, Dubbo, ZooKeeper,Redis,FastDFS,ActiveMQ,Nginx 1. 项目核心代码结构截图 项目模块依赖特别提醒:开发人员在开发的时候可以将自己的业务REST服务化或者Dubbo服务化2. 项目依赖介绍 2.1 后台管理系统、Rest服务系统、Scheculer定时调度系统依赖如下图: 2.2 Dubbo独立服务项目依赖如下图: 3.项目功能部分截图: zookeeper、dubbo服务启动dubbo管控台 REST服务平台
页:
[1]