|
分布式锁有很多,redis也可以实现分布式锁,
http://shangdc.blog.运维网.com/10093778/1914852(查看redis的分布式锁)
zookeeper分布式锁步骤:
1、zookeeper是一个带有节点的,类似于文件目录,所以我们把锁抽象成目录,zookeeper有一个EPHEMERAL_SEQUENTIAL类型的节点, 多个线程再zookeeper创建的节点的时候,它会帮我们安排好顺序进行创建,所以这个节点下的目录都是顺序的。
2、获取当前目录的最小的节点,判断最小节点是不是当前的自己的节点,如果是说明获取锁成功了,如果不是获取锁失败了。
3、当获取锁的时候失败了,为了避免惊群效应,你要做的就是获取当前自己的节点的上一个节点,然后对该节点进行监听,当上一个节点删除的时候,会触发这个监听,通知该节点。
4、这么做,释放锁的时候,也会通知下一个节点。
什么是惊群效应:理解为肉少狼多,当一个节点删除的时候,凡是订阅了此节点的watcha的监听都会重新获取锁,都要去争夺,如果数量少还好,当数量很大的时候这种设计就是不合理也是浪费资源。
zookeeper的状态和事件类型,提前了解一下。
状态 KeeperState.Disconnected (0) 断开
* KeeperState.SyncConnected (3) 同步连接状态
* KeeperState.AuthFailed (4) 认证失败状态
* KeeperState.ConnectedReadOnly (5) 只读连接状态
* KeeperState.SaslAuthenticated (6) SASL认证通过状态
* KeeperState.Expired (-112) 过期状态
*
* // EventType 是事件类型 主要关注 Create Delete DataChanged ChildrenChanged
* EventType.None (-1), 无
* EventType.NodeCreated (1),
* EventType.NodeDeleted (2),
* EventType.NodeDataChanged (3), 结点数据变化
* EventType.NodeChildrenChanged (4); 结点子节点变化 下面是代码,自己敲下,理解一下。
package com.lhcis.spider.system.annotation;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author sdc
*
*/
public class ZooDistributeLock implements Watcher {
private static final Logger LOG = LoggerFactory.getLogger(ZooDistributeLock.class);
private static final String LOCK_PATH = "/zkLock";
// 模拟开启的线程数
private static final int THREAD_NUM = 5;
// 用于等待所有线程都连接成功后再执行任务
private static CountDownLatch startFlag = new CountDownLatch(1);
// 用于确保所有线程执行完毕
private static CountDownLatch threadFlag = new CountDownLatch(THREAD_NUM);
private ZooKeeper zk = null;
private String currentPath;
private String lockPath;
public static void main(String[] args) {
for (int i = 0; i < THREAD_NUM; i++) {
final int j = i;
new Thread() {
@Override
public void run() {
ZooDistributeLock zooDistributeLock = new ZooDistributeLock();
try {
zooDistributeLock.connection();
System.out.println("连接" + j);
zooDistributeLock.createNode();
System.out.println("创建" + j);
zooDistributeLock.getLock();
System.out.println("获取锁" + j);
} catch (IOException | InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}.start();
}
try {
threadFlag.await();
LOG.info("所有线程执行完毕...");
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
}
/**
* Disconnected为网络闪断时触发的事件,当然其他的拔掉网线、kill zookeeper server ,kill zk
* connection也会触发该事件。 SyncConnected为client端重新选择下一个zk
* server连接触发的事件,此时watcher有效,也就是能正常感知
* Expired为客户端重新连server时,服务端发现该session超过了设定的时长,返回给client
* Expired,此时watcher失效,也就是不能正常感知
*/
@Override
public void process(WatchedEvent event) {
Event.KeeperState state = event.getState();
Event.EventType type = event.getType();
if (Event.KeeperState.SyncConnected == state) {
if (Event.EventType.None == type) {
// 标识连接成功
LOG.info("成功连接上ZK服务器");
startFlag.countDown();
}
if (Event.EventType.NodeDeleted == type && event.getPath().equals(this.lockPath)) {
LOG.info("node:" + this.lockPath + "的锁已经被释放");
try {
// 上一个节点释放了,当前节点去获取锁
getLock();
} catch (KeeperException | InterruptedException e) {
LOG.error(e.getMessage(), e);
}
}
}
}
/**
* 连接到 ZK
*
* @throws IOException
*/
private void connection() throws IOException, InterruptedException {
zk = new ZooKeeper("127.0.0.1:2181", 5000, this);
// 等待连接成功后再执行下一步操作
startFlag.await();
}
// 创建节点,并初始化当前路径
private void createNode() throws KeeperException, InterruptedException, UnsupportedEncodingException {
this.currentPath = this.zk.create(LOCK_PATH, "".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
private void getLock() throws KeeperException, InterruptedException {
if (minNode()) {
doSomething();
// 释放锁
releaseLock();
}
}
/**
* 当前是否为最小节点
*
* @return
*/
private boolean minNode() {
// 当前序号
try {
initLockPath();
// 判断前一个节点存在不存在,如果存在,则表示当前节点不是最小节点
// zk.getData(this.lockPath, this, new Stat());
zk.getData(this.lockPath, true, new Stat());
LOG.info(this.currentPath + " 不是最小值,没有获取锁,等待 " + this.lockPath + " 释放锁");
return false;
} catch (KeeperException e) {
LOG.info(this.currentPath + " 是最小值,获得锁");
return true;
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
return true;
}
private void doSomething() {
LOG.info("处理业务逻辑...");
}
/**
* 释放锁并关闭连接
*
* @throws KeeperException
* @throws InterruptedException
*/
private void releaseLock() throws KeeperException, InterruptedException {
Thread.sleep(2000);
if (this.zk != null) {
LOG.info(this.currentPath + " 业务处理完毕,释放锁...");
zk.delete(this.currentPath, -1);
this.zk.close();
LOG.info(Thread.currentThread().getName() + "关闭 zookeeper 连接");
}
threadFlag.countDown();
}
/**
* 初始化 lockpath
*/
private void initLockPath() {
int currentSeq = Integer.parseInt(this.currentPath.substring(LOCK_PATH.length()));
// 上一个序号
int preSeq = currentSeq - 1;
String preSeqStr = String.valueOf(preSeq);
while (preSeqStr.length() < 10) {
preSeqStr = "0" + preSeqStr;
}
this.lockPath = LOCK_PATH + preSeqStr;
}
} 参考代码:
https://juejin.im/entry/596438bc6fb9a06bb47495f1
|
|