漂亮蓝影 发表于 2018-11-5 06:10:45

Java实现基于Redis的分布式锁

  我这里不实现JDK的java.util.concurrent.locks.Lock接口,而是自定义一个,因为JDK的有个newCondition()方法我这里暂时没实现。这个Lock提供了5个lock方法的变体,可以自行选择使用哪一个来获取锁,我的想法是
  最好用带超时返回的那几个方法,因为不这样的话,假如redis挂了,线程永远都在那死循环了(关于这里,应该还可以进一步优化,如果redis挂了,Jedis的操作肯定会抛异常之类的,可以定义个机制让redis挂了的时候通知使用这个lock的用户,或者说是线程)
  Java代码 下载

[*]  package cc.lixiaohui.lock;
[*]
[*]  import java.util.concurrent.TimeUnit;
[*]
[*]  public interface Lock {
[*]
[*]  /**
[*]  * 阻塞性的获取锁, 不响应中断
[*]  */
[*]  void lock();
[*]
[*]  /**
[*]  * 阻塞性的获取锁, 响应中断
[*]  *
[*]  * @throws InterruptedException
[*]  */
[*]  void lockInterruptibly() throws InterruptedException;
[*]
[*]  /**
[*]  * 尝试获取锁, 获取不到立即返回, 不阻塞
[*]  */
[*]  boolean tryLock();
[*]
[*]  /**
[*]  * 超时自动返回的阻塞性的获取锁, 不响应中断
[*]  *
[*]  * @param time
[*]  * @param unit
[*]  * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未获取到锁
[*]  *
[*]  */
[*]  boolean tryLock(long time, TimeUnit unit);
[*]
[*]  /**
[*]  * 超时自动返回的阻塞性的获取锁, 响应中断
[*]  *
[*]  * @param time
[*]  * @param unit
[*]  * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未获取到锁
[*]  * @throws InterruptedException 在尝试获取锁的当前线程被中断
[*]  */
[*]  boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException;
[*]
[*]  /**
[*]  * 释放锁
[*]  */
[*]  void unlock();
[*]
[*]  }
  看其抽象实现:
  Java代码 下载

[*]  package cc.lixiaohui.lock;
[*]
[*]  import java.util.concurrent.TimeUnit;
[*]
[*]  /**
[*]  * 锁的骨架实现, 真正的获取锁的步骤由子类去实现.
[*]  *
[*]  * @author lixiaohui
[*]  *
[*]  */
[*]  public abstract class AbstractLock implements Lock {
[*]
[*]  /**
[*]  *
[*]  * 这里需不需要保证可见性值得讨论, 因为是分布式的锁,
[*]  * 1.同一个jvm的多个线程使用不同的锁对象其实也是可以的, 这种情况下不需要保证可见性
[*]  * 2.同一个jvm的多个线程使用同一个锁对象, 那可见性就必须要保证了.
[*]  *
[*]  */
[*]  protected volatile boolean locked;
[*]
[*]  /**
[*]  * 当前jvm内持有该锁的线程(if have one)
[*]  */
[*]  private Thread exclusiveOwnerThread;
[*]
[*]  public void lock() {
[*]  try {
[*]  lock(false, 0, null, false);
[*]  } catch (InterruptedException e) {
[*]  // TODO ignore
[*]  }
[*]  }
[*]
[*]  public void lockInterruptibly() throws InterruptedException {
[*]  lock(false, 0, null, true);
[*]  }
[*]
[*]  public boolean tryLock(long time, TimeUnit unit) {
[*]  try {
[*]  return lock(true, time, unit, false);
[*]  } catch (InterruptedException e) {
[*]  // TODO ignore
[*]  }
[*]  return false;
[*]  }
[*]
[*]  public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException {
[*]  return lock(true, time, unit, true);
[*]  }
[*]
[*]  public void unlock() {
[*]  // TODO 检查当前线程是否持有锁
[*]  if (Thread.currentThread() != getExclusiveOwnerThread()) {
[*]  throw new IllegalMonitorStateException("current thread does not hold the lock");
[*]  }
[*]
[*]  unlock0();
[*]  setExclusiveOwnerThread(null);
[*]  }
[*]
[*]  protected void setExclusiveOwnerThread(Thread thread) {
[*]  exclusiveOwnerThread = thread;
[*]  }
[*]
[*]  protected final Thread getExclusiveOwnerThread() {
[*]  return exclusiveOwnerThread;
[*]  }
[*]
[*]  protected abstract void unlock0();
[*]
[*]  /**
[*]  * 阻塞式获取锁的实现
[*]  *
[*]  * @param useTimeout
[*]  * @param time
[*]  * @param unit
[*]  * @param interrupt 是否响应中断
[*]  * @return
[*]  * @throws InterruptedException
[*]  */
[*]  protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException;
[*]
[*]  }
  基于Redis的最终实现,关键的获取锁,释放锁的代码在这个类的lock方法和unlock0方法里,大家可以只看这两个方法然后完全自己写一个:
  Java代码 下载

[*]  package cc.lixiaohui.lock;
[*]
[*]  import java.util.concurrent.TimeUnit;
[*]
[*]  import redis.clients.jedis.Jedis;
[*]
[*]  /**
[*]  *
[*]  * 基于Redis的SETNX操作实现的分布式锁
[*]  *
[*]  * 获取锁时最好用lock(long time, TimeUnit unit), 以免网路问题而导致线程一直阻塞
[*]  *
[*]  * SETNC操作参考资料
[*]  *
[*]  *
[*]  * @author lixiaohui
[*]  *
[*]  */
[*]  public class RedisBasedDistributedLock extends AbstractLock {
[*]
[*]  private Jedis jedis;
[*]
[*]  // 锁的名字
[*]  protected String lockKey;
[*]
[*]  // 锁的有效时长(毫秒)
[*]  protected long lockExpires;
[*]
[*]  public RedisBasedDistributedLock(Jedis jedis, String lockKey, long lockExpires) {
[*]  this.jedis = jedis;
[*]  this.lockKey = lockKey;
[*]  this.lockExpires = lockExpires;
[*]  }
[*]
[*]  // 阻塞式获取锁的实现
[*]  protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException{
[*]  if (interrupt) {
[*]  checkInterruption();
[*]  }
[*]
[*]  long start = System.currentTimeMillis();
[*]  long timeout = unit.toMillis(time); // if !useTimeout, then it's useless
[*]
[*]  while (useTimeout ? isTimeout(start, timeout) : true) {
[*]  if (interrupt) {
[*]  checkInterruption();
[*]  }
[*]
[*]  long lockExpireTime = System.currentTimeMillis() + lockExpires + 1;//锁超时时间
[*]  String stringOfLockExpireTime = String.valueOf(lockExpireTime);
[*]
[*]  if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁
[*]  // TODO 成功获取到锁, 设置相关标识
[*]  locked = true;
[*]  setExclusiveOwnerThread(Thread.currentThread());
[*]  return true;
[*]  }
[*]
[*]  String value = jedis.get(lockKey);
[*]  if (value != null && isTimeExpired(value)) { // lock is expired
[*]  // 假设多个线程(非单jvm)同时走到这里
[*]  String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic
[*]  // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)
[*]  // 加入拿到的oldValue依然是expired的,那么就说明拿到锁了
[*]  if (oldValue != null && isTimeExpired(oldValue)) {
[*]  // TODO 成功获取到锁, 设置相关标识
[*]  locked = true;
[*]  setExclusiveOwnerThread(Thread.currentThread());
[*]  return true;
[*]  }
[*]  } else {
[*]  // TODO lock is not expired, enter next loop retrying
[*]  }
[*]  }
[*]  return false;
[*]  }
[*]
[*]  public boolean tryLock() {
[*]  long lockExpireTime = System.currentTimeMillis() + lockExpires + 1;//锁超时时间
[*]  String stringOfLockExpireTime = String.valueOf(lockExpireTime);
[*]
[*]  if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁
[*]  // TODO 成功获取到锁, 设置相关标识
[*]  locked = true;
[*]  setExclusiveOwnerThread(Thread.currentThread());
[*]  return true;
[*]  }
[*]
[*]  String value = jedis.get(lockKey);
[*]  if (value != null && isTimeExpired(value)) { // lock is expired
[*]  // 假设多个线程(非单jvm)同时走到这里
[*]  String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic
[*]  // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)
[*]  // 假如拿到的oldValue依然是expired的,那么就说明拿到锁了
[*]  if (oldValue != null && isTimeExpired(oldValue)) {
[*]  // TODO 成功获取到锁, 设置相关标识
[*]  locked = true;
[*]  setExclusiveOwnerThread(Thread.currentThread());
[*]  return true;
[*]  }
[*]  } else {
[*]  // TODO lock is not expired, enter next loop retrying
[*]  }
[*]
[*]  return false;
[*]  }
[*]
[*]  /**
[*]  * Queries if this lock is held by any thread.
[*]  *
[*]  * @return {@code true} if any thread holds this lock and
[*]  *         {@code false} otherwise
[*]  */
[*]  public boolean isLocked() {
[*]  if (locked) {
[*]  return true;
[*]  } else {
[*]  String value = jedis.get(lockKey);
[*]  // TODO 这里其实是有问题的, 想:当get方法返回value后, 假设这个value已经是过期的了,
[*]  // 而就在这瞬间, 另一个节点set了value, 这时锁是被别的线程(节点持有), 而接下来的判断
[*]  // 是检测不出这种情况的.不过这个问题应该不会导致其它的问题出现, 因为这个方法的目的本来就
[*]  // 不是同步控制, 它只是一种锁状态的报告.
[*]  return !isTimeExpired(value);
[*]  }
[*]  }
[*]
[*]  @Override
[*]  protected void unlock0() {
[*]  // TODO 判断锁是否过期
[*]  String value = jedis.get(lockKey);
[*]  if (!isTimeExpired(value)) {
[*]  doUnlock();
[*]  }
[*]  }
[*]
[*]  private void checkInterruption() throws InterruptedException {
[*]  if(Thread.currentThread().isInterrupted()) {
[*]  throw new InterruptedException();
[*]  }
[*]  }
[*]
[*]  private boolean isTimeExpired(String value) {
[*]  return Long.parseLong(value) < System.currentTimeMillis();
[*]  }
[*]
[*]  private boolean isTimeout(long start, long timeout) {
[*]  return start + timeout > System.currentTimeMillis();
[*]  }
[*]
[*]  private void doUnlock() {
[*]  jedis.del(lockKey);
[*]  }
[*]
[*]  }
  如果将来还换一种实现方式(比如zookeeper之类的),到时直接继承AbstractLock并实现lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt), unlock0()方法即可(所谓抽象嘛)


页: [1]
查看完整版本: Java实现基于Redis的分布式锁