设为首页 收藏本站
查看: 596|回复: 0

[经验分享] 基于Zookeeper的分布式共享锁

[复制链接]

尚未签到

发表于 2017-4-19 11:21:47 | 显示全部楼层 |阅读模式
  首先,说说我们的场景,订单服务是做成集群的,当两个以上结点同时收到一个相同订单的创建指令,这时并发就产生了,系统就会重复创建订单。等等......场景。这时,分布式共享锁就闪亮登场了。
  共享锁在同一个进程中是很容易实现的,但在跨进程或者在不同Server之间就不好实现了。Zookeeper就很容易实现。具体的实现原理官网和其它网站也有翻译,这里就不在赘述了。
  官网资料:http://zookeeper.apache.org/doc/r3.4.5/recipes.html
  中文资料:https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper
  详见Locks章节。
  原理都知道了,网上一搜索Apache上面已经有提供了,既然已经有轮子了,哪我们也没必要重复造轮子了吧!直接使用Curator。但是,我们在测试中发现,用于共享锁的结点无法自动回收,除了最末一级的临时结点会在锁释放和session超时的时候能自动回收外,其它结点均无法自动回收。我们的订单一天有好几万,遇到618和双十一的时候每天的订单量超50W,如果结点长期不回收的话,肯定会影响Zookeeper的性能。这时,我们就想到了一句话“自己动手,丰衣足食”。下面直接上代码:
  首先,创建一个Maven工程,在pom文件里导入下面的包:

<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
  LockZookeeperClient接口:

package com.XXX.framework.lock;
import org.apache.curator.framework.CuratorFramework;
/**
*
* description
*
* @author Roadrunners
* @version 1.0, 2015年7月9日
*/
public interface LockZookeeperClient {
/**
*
* @return
*/
CuratorFramework getCuratorFramework();
/**
*
* @return
*/
String getBasePath();
/**
* garbage collector
*
* @param gcPath
*/
void gc(String gcPath);
}

  LockZookeeperClient接口的实现LockZookeeperClientFactory:

package com.XXX.framework.lock;
import java.util.Date;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
*
* description
*  
* @author Roadrunners
* @version 1.0, 2015年7月9日
*/
public class LockZookeeperClientFactory implements LockZookeeperClient {
private static final Log LOG = LogFactory.getLog(LockZookeeperClientFactory.class);
private boolean hasGc = true;
private Timer gcTimer;
private TimerTask gcTimerTask;
private ConcurrentSkipListSet<String> gcPaths = new ConcurrentSkipListSet<String>();
private int gcIntervalSecond = 60;
private CuratorFramework curatorFramework;
private String zookeeperIpPort = "localhost:2181";
private int sessionTimeoutMs = 10000;
private int connectionTimeoutMs = 10000;
private String basePath = "/locks";
public void setHasGc(boolean hasGc) {
this.hasGc = hasGc;
}
public void setGcIntervalSecond(int gcIntervalSecond) {
this.gcIntervalSecond = gcIntervalSecond;
}
public void setZookeeperIpPort(String zookeeperIpPort) {
this.zookeeperIpPort = zookeeperIpPort;
}
public void setSessionTimeoutMs(int sessionTimeoutMs) {
this.sessionTimeoutMs = sessionTimeoutMs;
}
public void setConnectionTimeoutMs(int connectionTimeoutMs) {
this.connectionTimeoutMs = connectionTimeoutMs;
}
public void setBasePath(String basePath) {
basePath = basePath.trim();
if (basePath.endsWith("/")) {
basePath = basePath.substring(0, basePath.length() - 1);
}
this.basePath = basePath;
}
public void init() {
if(StringUtils.isBlank(zookeeperIpPort)){
throw new NullPointerException("zookeeperIpPort");
}
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
curatorFramework = CuratorFrameworkFactory.newClient(zookeeperIpPort.trim(), sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
curatorFramework.start();
LOG.info("CuratorFramework initialise succeed.");
if (hasGc) {
gc();
}
}
public void destroy() {
gcPaths.clear();
gcPaths = null;
gcStop();
curatorFramework.close();
curatorFramework = null;
}
@Override
public void gc(String gcPath) {
if (hasGc && StringUtils.isNotBlank(gcPath)) {
gcPaths.add(gcPath.trim());
}
}
@Override
public CuratorFramework getCuratorFramework() {
return this.curatorFramework;
}
@Override
public String getBasePath() {
return this.basePath;
}
private synchronized void gc() {
gcStop();
try {
scanningGCNodes();
} catch (Throwable e) {
LOG.warn(e);
}
gcTimerTask = new TimerTask() {
@Override
public void run() {
doingGc();
}
};
Date begin = new Date();
begin.setTime(begin.getTime() + (10 * 1000L));
gcTimer = new Timer("lock-gc", true);
gcTimer.schedule(gcTimerTask, begin, gcIntervalSecond * 1000L);
}
private synchronized void gcStop() {
if (null != gcTimer) {
gcTimer.cancel();
gcTimer = null;
}
if (null != gcTimerTask) {
gcTimerTask.cancel();
gcTimerTask = null;
}
}
private synchronized void scanningGCNodes() throws Exception {
if (null == curatorFramework.checkExists().forPath(basePath)) {
return;
}
List<String> paths = curatorFramework.getChildren().forPath(basePath);
if (CollectionUtils.isEmpty(paths)) {
gcPaths.add(basePath);
return;
}
for (String path : paths) {
try{
String tmpPath = basePath + "/" + path;
if (null == curatorFramework.checkExists().forPath(tmpPath)) {
continue;
}
gcPaths.add(tmpPath);
} catch(Throwable e){
LOG.warn("scanning gc nodes error.", e);
}
}
}
private synchronized void doingGc() {
LOG.debug("GC beginning.");
if (CollectionUtils.isNotEmpty(gcPaths)) {
for (String path : gcPaths) {
try {
if (null != curatorFramework.checkExists().forPath(path)) {
if (CollectionUtils.isEmpty(curatorFramework.getChildren().forPath(path))) {
curatorFramework.delete().forPath(path);
gcPaths.remove(path);
LOG.debug("GC " + path);
}
} else {
gcPaths.remove(path);
}
} catch (Throwable e) {
gcPaths.remove(path);
LOG.warn(e);
}
}
}
LOG.debug("GC ended.");
}
}

  SharedLock共享锁:

package com.XXX.framework.lock.shared;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import com.XXX.framework.lock.LockZookeeperClient;
/**
*
* description
*
* @author Roadrunners
* @version 1.0, 2015年7月9日
*/
public class SharedLock {
private InterProcessLock interProcessLock;
public SharedLock(LockZookeeperClient lockZookeeperClient, String resourceId) {
super();
if (StringUtils.isBlank(resourceId)) {
throw new NullPointerException("resourceId");
}
String path = lockZookeeperClient.getBasePath();
path += ("/" + resourceId.trim());
interProcessLock = new InterProcessMutex(lockZookeeperClient.getCuratorFramework(), path);
lockZookeeperClient.gc(path);
}
/**
* Acquire the mutex - blocking until it's available. Each call to acquire must be balanced by a call
* to {@link #release()}
*
* @throws Exception ZK errors, connection interruptions
*/
public void acquire() throws Exception {
interProcessLock.acquire();
}
/**
* Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call
* to {@link #release()}
*
* @param time time to wait
* @param unit time unit
* @return true if the mutex was acquired, false if not
* @throws Exception ZK errors, connection interruptions
*/
public boolean acquire(long time, TimeUnit unit) throws Exception {
return interProcessLock.acquire(time, unit);
}
/**
* Perform one release of the mutex.
*
* @throws Exception ZK errors, interruptions, current thread does not own the lock
*/
public void release() throws Exception {
interProcessLock.release();
}
/**
* Returns true if the mutex is acquired by a thread in this JVM
*
* @return true/false
*/
public boolean isAcquiredInThisProcess() {
return interProcessLock.isAcquiredInThisProcess();
}
}

  到此代码已经完成。下面写一个简单的Demo:

//LockZookeeperClientFactory通常是通过Spring配置注入的,此处是为了Demo的简单明了才这样写的,不建议这样写
LockZookeeperClientFactory lzc = new LockZookeeperClientFactory();
lzc.setZookeeperIpPort("10.100.15.1:8900");
lzc.setBasePath("/locks/sharedLock/");
lzc.init();
SharedLock sharedLock = new SharedLock(lzc, "sharedLock1");
try {
if (sharedLock.acquire(100, TimeUnit.MILLISECONDS)) {
System.out.println("sharedLock1 get");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
sharedLock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
lzc.destroy();
 
  就这样,系统就会每隔一分钟去回收一次没有使用的结点。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366336-1-1.html 上篇帖子: ZooKeeper学习之关于Servers和Sessions 下篇帖子: zookeeper系列之异步通知模式-Watcher
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表