设为首页 收藏本站
查看: 539|回复: 0

[经验分享] zookeeper 分布式锁

[复制链接]

尚未签到

发表于 2019-1-8 07:06:37 | 显示全部楼层 |阅读模式
  分布式锁有很多,redis也可以实现分布式锁,
  http://shangdc.blog.运维网.com/10093778/1914852(查看redis的分布式锁)
  zookeeper分布式锁步骤:
  1、zookeeper是一个带有节点的,类似于文件目录,所以我们把锁抽象成目录,zookeeper有一个EPHEMERAL_SEQUENTIAL类型的节点, 多个线程再zookeeper创建的节点的时候,它会帮我们安排好顺序进行创建,所以这个节点下的目录都是顺序的。
  2、获取当前目录的最小的节点,判断最小节点是不是当前的自己的节点,如果是说明获取锁成功了,如果不是获取锁失败了。
  3、当获取锁的时候失败了,为了避免惊群效应,你要做的就是获取当前自己的节点的上一个节点,然后对该节点进行监听,当上一个节点删除的时候,会触发这个监听,通知该节点。
  4、这么做,释放锁的时候,也会通知下一个节点。
  什么是惊群效应:理解为肉少狼多,当一个节点删除的时候,凡是订阅了此节点的watcha的监听都会重新获取锁,都要去争夺,如果数量少还好,当数量很大的时候这种设计就是不合理也是浪费资源。
  zookeeper的状态和事件类型,提前了解一下。
状态 KeeperState.Disconnected (0)  断开
* KeeperState.SyncConnected (3)  同步连接状态
* KeeperState.AuthFailed (4) 认证失败状态
* KeeperState.ConnectedReadOnly (5)  只读连接状态
* KeeperState.SaslAuthenticated (6) SASL认证通过状态
* KeeperState.Expired (-112)  过期状态
*
* // EventType 是事件类型 主要关注 Create Delete DataChanged ChildrenChanged
* EventType.None (-1), 无
* EventType.NodeCreated (1),
* EventType.NodeDeleted (2),
* EventType.NodeDataChanged (3),  结点数据变化
* EventType.NodeChildrenChanged (4); 结点子节点变化  下面是代码,自己敲下,理解一下。
package com.lhcis.spider.system.annotation;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author sdc
*
*/
public class ZooDistributeLock implements Watcher {
private static final Logger LOG = LoggerFactory.getLogger(ZooDistributeLock.class);
private static final String LOCK_PATH = "/zkLock";
// 模拟开启的线程数
private static final int THREAD_NUM = 5;
// 用于等待所有线程都连接成功后再执行任务
private static CountDownLatch startFlag = new CountDownLatch(1);
// 用于确保所有线程执行完毕
private static CountDownLatch threadFlag = new CountDownLatch(THREAD_NUM);
private ZooKeeper zk = null;
private String currentPath;
private String lockPath;
public static void main(String[] args) {
for (int i = 0; i < THREAD_NUM; i++) {
final int j = i;
new Thread() {
@Override
public void run() {
ZooDistributeLock zooDistributeLock = new ZooDistributeLock();
try {
zooDistributeLock.connection();
System.out.println(&quot;连接&quot; + j);
zooDistributeLock.createNode();
System.out.println(&quot;创建&quot; + j);
zooDistributeLock.getLock();
System.out.println(&quot;获取锁&quot; + j);
} catch (IOException | InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}.start();
}
try {
threadFlag.await();
LOG.info(&quot;所有线程执行完毕...&quot;);
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
}
/**
* Disconnected为网络闪断时触发的事件,当然其他的拔掉网线、kill zookeeper server ,kill zk
* connection也会触发该事件。 SyncConnected为client端重新选择下一个zk
* server连接触发的事件,此时watcher有效,也就是能正常感知
* Expired为客户端重新连server时,服务端发现该session超过了设定的时长,返回给client
* Expired,此时watcher失效,也就是不能正常感知
*/
@Override
public void process(WatchedEvent event) {
Event.KeeperState state = event.getState();
Event.EventType type = event.getType();
if (Event.KeeperState.SyncConnected == state) {
if (Event.EventType.None == type) {
// 标识连接成功
LOG.info(&quot;成功连接上ZK服务器&quot;);
startFlag.countDown();
}
if (Event.EventType.NodeDeleted == type && event.getPath().equals(this.lockPath)) {
LOG.info(&quot;node:&quot; + this.lockPath + &quot;的锁已经被释放&quot;);
try {
// 上一个节点释放了,当前节点去获取锁
getLock();
} catch (KeeperException | InterruptedException e) {
LOG.error(e.getMessage(), e);
}
}
}
}
/**
* 连接到 ZK
*
* @throws IOException
*/
private void connection() throws IOException, InterruptedException {
zk = new ZooKeeper(&quot;127.0.0.1:2181&quot;, 5000, this);
// 等待连接成功后再执行下一步操作
startFlag.await();
}
// 创建节点,并初始化当前路径
private void createNode() throws KeeperException, InterruptedException, UnsupportedEncodingException {
this.currentPath = this.zk.create(LOCK_PATH, &quot;&quot;.getBytes(&quot;UTF-8&quot;), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
private void getLock() throws KeeperException, InterruptedException {
if (minNode()) {
doSomething();
// 释放锁
releaseLock();
}
}
/**
* 当前是否为最小节点
*
* @return
*/
private boolean minNode() {
// 当前序号
try {
initLockPath();
// 判断前一个节点存在不存在,如果存在,则表示当前节点不是最小节点
// zk.getData(this.lockPath, this, new Stat());
zk.getData(this.lockPath, true, new Stat());
LOG.info(this.currentPath + &quot; 不是最小值,没有获取锁,等待 &quot; + this.lockPath + &quot; 释放锁&quot;);
return false;
} catch (KeeperException e) {
LOG.info(this.currentPath + &quot; 是最小值,获得锁&quot;);
return true;
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
return true;
}
private void doSomething() {
LOG.info(&quot;处理业务逻辑...&quot;);
}
/**
* 释放锁并关闭连接
*
* @throws KeeperException
* @throws InterruptedException
*/
private void releaseLock() throws KeeperException, InterruptedException {
Thread.sleep(2000);
if (this.zk != null) {
LOG.info(this.currentPath + &quot; 业务处理完毕,释放锁...&quot;);
zk.delete(this.currentPath, -1);
this.zk.close();
LOG.info(Thread.currentThread().getName() + &quot;关闭 zookeeper 连接&quot;);
}
threadFlag.countDown();
}
/**
* 初始化 lockpath
*/
private void initLockPath() {
int currentSeq = Integer.parseInt(this.currentPath.substring(LOCK_PATH.length()));
// 上一个序号
int preSeq = currentSeq - 1;
String preSeqStr = String.valueOf(preSeq);
while (preSeqStr.length() < 10) {
preSeqStr = &quot;0&quot; + preSeqStr;
}
this.lockPath = LOCK_PATH + preSeqStr;
}
}  参考代码:
  https://juejin.im/entry/596438bc6fb9a06bb47495f1






运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-660500-1-1.html 上篇帖子: Zookeeper详解(七):Zookeeper集群启动过程和Leader选举 下篇帖子: elk6.3.1+zookeeper+kafka+filebeat收集dockerswarm容器日志
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表