heberoxx 发表于 2017-12-20 21:21:23

分布式缓存技术redis学习系列(五)——redis实战(redis与spring整合,分布式锁实现)

Redis与spring的整合

相关依赖jar包
  spring把专门的数据操作独立封装在spring-data系列中,spring-data-redis是对Redis的封装
  

<dependencies>  <!-- 添加spring-data的支持 -->
  <dependency>
  <groupId>org.springframework.data</groupId>
  <artifactId>spring-data-redis</artifactId>
  <version>1.4.2.RELEASE</version>
  </dependency>
  <dependency>
  <groupId>redis.clients</groupId>
  <artifactId>jedis</artifactId>
  <version>2.6.2</version>
  </dependency>
  <dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-pool2</artifactId>
  <version>2.4.2</version>
  </dependency>
  <dependency>
  <groupId>javax.persistence</groupId>
  <artifactId>persistence-api</artifactId>
  <version>1.0.2</version>
  </dependency>
  <!-- 添加junit支持 -->
  <dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>4.12</version>
  </dependency>
  <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-test</artifactId>
  <version>4.3.2.RELEASE</version>
  </dependency>
  

  

  <dependency>
  <groupId>commons-logging</groupId>
  <artifactId>commons-logging</artifactId>
  <version>1.2</version>
  </dependency>
  </dependencies>
  


Spring 配置文件applicationContext.xml
  

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
  xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util"
  xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context"
  xmlns:jaxws="http://cxf.apache.org/jaxws"
  xsi:schemaLocation="
  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
  http://cxf.apache.org/jaxws http://cxf.apache.org/schema/jaxws.xsd
  http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
  http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
  http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
  
<!-- 命令空间加入上面这行-->
  

  <context:property-placeholder location="classpath:redis.properties"/>
  <bean>
  <property name="maxIdle" value="${redis.maxIdle}" />
  <property name="maxTotal" value="${redis.maxTotal}" />
  <property name="MaxWaitMillis" value="${redis.MaxWaitMillis}" />
  <property name="testOnBorrow" value="${redis.testOnBorrow}" />
  </bean>
  <bean p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.pass}" p:pool-config-ref="poolConfig" />
  

  <bean>
  <property name="connectionFactory" ref="connectionFactory" />
  </bean>
  
</beans>
  

  注意新版的maxTotal,MaxWaitMillis这两个字段与旧版的不同。

redis连接池配置文件redis.properties
  

redis.host=192.168.2.129  
redis.port=6379
  
redis.pass=redis129
  
redis.maxIdle=300
  
redis.maxTotal=600
  
redis.MaxWaitMillis=1000
  
redis.testOnBorrow=true
  

  好了,配置完成,下面写上代码

测试代码
  

import javax.persistence.Entity;  

import javax.persistence.Table;  

  

/**  *
@author  * @date 创建时间:2017年2月9日 上午8:51:02
  * @parameter
*/  
@Entity
  
@Table(name
= "t_user")  

public>//主键
  private String>  //用户名
  private String userName;
  public String getId() {

  return>  }

  public void setId(String>
  this.id =>  }
  public String getUserName() {
  return userName;
  }
  public void setUserName(String userName) {
  this.userName = userName;
  }
  
}
  


BaseRedisDao
  

import org.springframework.beans.factory.annotation.Autowired;  

import org.springframework.data.redis.core.RedisTemplate;  

  

/**  *
@author  * @date 创建时间:2017年2月9日 上午9:02:16
  * @parameter
*/  
public>  @Autowired(required=true)
  protected RedisTemplate<K,V> redisTemplate;
  

  
}
  


IUserDao
  

public interface IUserDao {public boolean save(User user);public boolean update(User user);public boolean delete(String userIds);public User find(String userId);  
}
  


UserDao
  

package com.shanheyongmu.dao;  

  

import org.springframework.dao.DataAccessException;  

import org.springframework.data.redis.connection.RedisConnection;  

import org.springframework.data.redis.core.RedisCallback;  

import org.springframework.data.redis.core.RedisTemplate;  

import org.springframework.data.redis.serializer.RedisSerializer;  

  

import com.shanheyongmu.entity.User;  

  

/**  *
@author  * @date 创建时间:2017年2月9日 上午9:08:28
  * @parameter
*/  
public>  

  public boolean save(final User user) {
  boolean res = redisTemplate.execute(new RedisCallback<Boolean>() {
  

  public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
  RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
  byte[] key = serializer.serialize(user.getId());
  byte[] value = serializer.serialize(user.getUserName());
  //set not exits
  return connection.setNX(key, value);
  }
  });
  return res;
  }
  

  public boolean update(final User user) {
  boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
  

  public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
  RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
  byte[] key = serializer.serialize(user.getId());
  byte[] name = serializer.serialize(user.getUserName());
  //set
  
                connection.set(key, name);
  return true;
  }
  });
  return result;
  }
  

  public boolean delete(final String userId) {
  boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
  

  public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
  RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
  byte[] key = serializer.serialize(userId);
  //delete
  
                connection.del(key);
  return true;
  }
  });
  return result;
  }
  

  public User find(final String userId) {
  User result = redisTemplate.execute(new RedisCallback<User>() {
  

  public User doInRedis(RedisConnection connection) throws DataAccessException {
  RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
  byte[] key = serializer.serialize(userId);
  //get
  byte[] value = connection.get(key);
  if(value == null) {
  return null;
  }
  String name = serializer.deserialize(value);
  User resUser = new User();
  resUser.setId(userId);
  resUser.setUserName(name);
  return resUser;
  }
  });
  return result;
  }
  

  
}
  


Test
  写这个类的时候需要引入junit包和spring-test.jar
  

package com.shanheyongmu.test;  

import org.junit.Assert;  

import org.junit.Test;  

import org.junit.runner.RunWith;  

import org.springframework.beans.factory.annotation.Autowired;  

import org.springframework.test.context.ContextConfiguration;  

import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;  

import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;  

  

  

import com.shanheyongmu.dao.IUserDao;  

import com.shanheyongmu.entity.User;  

  

  

  

/**  *
@author  * @date 创建时间:2017年2月9日 上午10:42:55
  * @parameter
*/  
@RunWith(SpringJUnit4ClassRunner.
class)  
@ContextConfiguration(locations
= {"classpath*:applicationContext.xml"})  

public>
  @Autowired
private IUserDao userDao;  @Test
public void testSaveUser() {  User user
= new User();  user.setId(
"402891815170e8de015170f6520b0000");  user.setUserName(
"zhangsan");  }
  @Test
public void testGetUser() {  User user
= new User();  user
= userDao.find("402891815170e8de015170f6520b0000");  System.out.println(user.getId()
+ "-" +user.getUserName());  }
  @Test
public void testUpdateUser() {  User user
= new User();  user.setId(
"402891815170e8de015170f6520b0000");  user.setUserName(
"lisi");boolean res = userDao.update(user);  Assert.assertTrue(res);
  }
  @Test
public void testDeleteUser() {boolean res = userDao.delete("402891815170e8de015170f6520b0000");  Assert.assertTrue(res);
  }
  
}
  

  String类型的增删该查已完成,Hash,List,Set数据类型的操作就不举例了,和使用命令的方式差不多。如下
  

connection.hSetNX(key, field, value);  
connection.hDel(key, fields);
  
connection.hGet(key, field);
  
connection.lPop(key);
  
connection.lPush(key, value);
  
connection.rPop(key);
  
connection.rPush(key, values);
  
connection.sAdd(key, values);
  
connection.sMembers(key);
  
connection.sDiff(keys);
  
connection.sPop(key);
  


整合可能遇到的问题

1.NoSuchMethodError
  

java.lang.NoSuchMethodError: org.springframework.core.serializer.support.DeserializingConverter.<init>(Ljava/lang/ClassLoader;)V  
Caused by: java.lang.NoSuchMethodError: redis.clients.jedis.JedisShardInfo.setTimeout(I)V
  

  类似找不到类,找不到方法的问题,当确定依赖的jar已经引入之后,此类问题多事spring-data-redis以及jedis版本问题,多换个版本试试,本文上面提到的版本可以使用。

1.No qualifying bean
  

No qualifying bean of type found for dependency  

  找不到bean,考虑applicationContext.xml中配置redisTemplate bean时实现类是否写错。例如,BaseRedisDao注入的是RedisTemplate类型的对象,applicationContext.xml中配置的实现类却是RedisTemplate的子类StringRedisTemplate,那肯定报错。整合好后,下面我们着重学习基于redis的分布式锁的实现。

基于redis实现的分布式锁
  我们知道,在多线程环境中,锁是实现共享资源互斥访问的重要机制,以保证任何时刻只有一个线程在访问共享资源。锁的基本原理是:用一个状态值表示锁,对锁的占用和释放通过状态值来标识,因此基于redis实现的分布式锁主要依赖redis的SETNX命令和DEL命令,SETNX相当于上锁,DEL相当于释放锁,当然,在下面的具体实现中会更复杂些。之所以称为分布式锁,是因为客户端可以在redis集群环境中向集群中任一个可用Master节点请求上锁(即SETNX命令存储key到redis缓存中是随机的)。
  现在相信你已经对在基于redis实现的分布式锁的基本概念有了解,需要注意的是,这个和前面文章提到的使用WATCH 命令对key值进行锁操作没有直接的关系。java中synchronized和Lock对象都能对共享资源进行加锁,下面我们将学习用java实现的redis分布式锁。

java中的锁技术
  在分析java实现的redis分布式锁之前,我们先来回顾下java中的锁技术,为了直观的展示,我们采用“多个线程共享输出设备”来举例。

不加锁共享输出设备
  

  

  package com.shanheyongmu.lock;
  

  

  import java.util.concurrent.locks.Lock;
  import java.util.concurrent.locks.ReentrantLock;
  

  

  import com.shanheyongmu.redislock.RedisLock;
  

  public
>
//不加锁
  static>  public void output(String name) {
  for(int i=0; i<name.length(); i++) {
  System.out.print(name.charAt(i));
  }
  System.out.println();
  }
  }
  public static void main(String[] args) {
  final Outputer output = new Outputer();
  //线程1打印zhangsan
  new Thread(new Runnable(){
  @Override
  public void run() {
  while(true) {
  try{
  Thread.sleep(1000);
  }catch(InterruptedException e) {
  e.printStackTrace();
  }
  output.output("zhangsan");
  }
  }
  }).start();
  //线程2打印lingsi
  new Thread(new Runnable(){
  @Override
  public void run() {
  while(true) {
  try{
  Thread.sleep(1000);
  }catch(InterruptedException e) {
  e.printStackTrace();
  }
  output.output("lingsi");
  }
  }
  }).start();
  //线程3打印wangwu
  new Thread(new Runnable(){
  @Override
  public void run() {
  while(true) {
  try{
  Thread.sleep(1000);
  }catch(InterruptedException e) {
  e.printStackTrace();
  }
  output.output("huangwu");
  }
  }
  }).start();
  }
  
}
  

  上面例子中,三个线程同时共享输出设备output,线程1需要打印zhangsan,线程2需要打印lingsi,线程3需要打印wangwu。在不加锁的情况,这三个线程会不会因为得不到输出设备output打架呢,我们来看看运行结果:
  

huangwu  
zhangslingsi
  
an
  
huangwu
  
zlingsi
  
hangsan
  
huangwu
  
lzhangsan
  
ingsi
  
huangwu
  
lingsi
  

  从运行结果可以看出,三个线程打架了,线程1没打印完zhangsan,线程2就来抢输出设备......可见,这不是我们想要的,我们想要的是线程之间能有序的工作,各个线程之间互斥的使用输出设备output。

使用java5中的Lock对输出设备加锁
  现在我们对Outputer进行改进,给它加上锁,加锁之后每次只有一个线程能访问它。
  

//使用java5中的锁  
static>  Lock lock = new ReentrantLock();
  public void output(String name) {
  //传统java加锁
  //synchronized (Outputer.class){
  
      lock.lock();
  try {
  for(int i=0; i<name.length(); i++) {
  System.out.print(name.charAt(i));
  }
  System.out.println();
  }finally{
  //任何情况下都有释放锁
  
            lock.unlock();
  }
  //}
  
    }
  
}
  

  看看加锁后的输出结果:
  

zhangsan  
lingsi
  
huangwu
  
zhangsan
  
lingsi
  
huangwu
  
zhangsan
  
lingsi
  
huangwu
  
zhangsan
  
lingsi
  
huangwu
  
zhangsan
  
lingsi
  
huangwu
  
......
  

  从运行结果中可以看出,三个线程之间不打架了,线程之间的打印变得有序。有个这个基础,下面我们来学习基于Redis实现的分布式锁就更容易了。

Redis分布式锁

实现分析
  从上面java锁的使用中可以看出,锁对象主要有lock与unlock方法,在lock与unlock方法之间的代码(临界区)能保证线程互斥访问。基于redis实现的Java分布式锁主要依赖redis的SETNX命令和DEL命令,SETNX相当于上锁(lock),DEL相当于释放锁(unlock)。我们只要实现Lock接口重写lock()和unlock()即可。但是这还不够,安全可靠的分布式锁应该满足满足下面三个条件:
  l 互斥,不管任何时候,只有一个客户端能持有同一个锁。
  l 不会死锁,最终一定会得到锁,即使持有锁的客户端对应的master节点宕掉。
  l 容错,只要大多数Redis节点正常工作,客户端应该都能获取和释放锁。
  那么什么情况下回不满足上面三个条件呢。多个线程(客户端)同时竞争锁可能会导致多个客户端同时拥有锁。比如,
  (1)线程1在master节点拿到了锁(存入key)
  (2)master节点在把线程1创建的key写入slave之前宕机了,此时集群中的节点已经没有锁(key)了,包括master节点的slaver节点
  (3)slaver节点升级为master节点
  (4)线程2向新的master节点发起锁(存入key)请求,很明显,能请求成功。
  可见,线程1和线程2同时获得了锁。如果在更高并发的情况,可能会有更多线程(客户端)获取锁,这种情况就会导致上文所说的线程“打架”问题,线程之间的执行杂乱无章。
  那什么情况下又会发生死锁的情况呢。如果拥有锁的线程(客户端)长时间的执行或者因为某种原因造成阻塞,就会导致锁无法释放(unlock没有调用),其它线程就不能获取锁而而产生无限期死锁的情况。其它线程在执行lock失败后即使粗暴的执行unlock删除key之后也不能正常释放锁,因为锁就只能由获得锁的线程释放,锁不能正常释放其它线程仍然获取不到锁。解决死锁的最好方式是设置锁的有效时间(redis的expire命令),不管是什么原因导致的死锁,有效时间过后,锁将会被自动释放。
  为了保障容错功能,即只要有Redis节点正常工作,客户端应该都能获取和释放锁,我们必须用相同的key不断循环向Master节点请求锁,当请求时间超过设定的超时时间则放弃请求锁,这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间,如果一个master节点不可用了,应该尽快尝试下一个master节点。释放锁比较简单,因为只需要在所有节点都释放锁就行,不管之前有没有在该节点获取锁成功。

Redlock算法
  根据上面的分析,官方提出了一种用Redis实现分布式锁的算法,这个算法称为RedLock。RedLock算法的主要流程如下:


RedLock算法主要流程


Java实现
  结合上面的流程图,加上下面的代码解释,相信你一定能理解redis分布式锁的实现原理
  

public>
protected StringRedisTemplate redisStringTemplate;// 存储到redis中的锁标志  private static final String LOCKED = "LOCKED";
  // 请求锁的超时时间(ms)
  private static final long TIME_OUT = 30000;
  // 锁的有效时间(s)
  public static final int EXPIRE = 60;
  // 锁标志对应的key;
  private String key;
  // state flag
  private volatile boolean isLocked = false;
  public RedisLock(String key) {
  this.key = key;
  @SuppressWarnings("resource")
  ApplicationContextctx =new>  redisStringTemplate = (StringRedisTemplate)ctx.getBean("redisStringTemplate");
  }
  @Override
  public void lock() {
  //系统当前时间,毫秒
  long nowTime = System.nanoTime();
  //请求锁超时时间,毫秒
  long timeout = TIME_OUT*1000000;
  final Random r = new Random();
  try {
  //不断循环向Master节点请求锁,当请求时间(System.nanoTime() - nano)超过设定的超时时间则放弃请求锁
  //这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间
  //如果一个master节点不可用了,应该尽快尝试下一个master节点
  while ((System.nanoTime() - nowTime) < timeout) {
  //将锁作为key存储到redis缓存中,存储成功则获得锁
  if (redisStringTemplate.getConnectionFactory().getConnection().setNX(key.getBytes(),
  LOCKED.getBytes())) {
  //设置锁的有效期,也是锁的自动释放时间,也是一个客户端在其他客户端能抢占锁之前可以执行任务的时间
  //可以防止因异常情况无法释放锁而造成死锁情况的发生
  
                  redisStringTemplate.expire(key, EXPIRE, TimeUnit.SECONDS);
  isLocked = true;
  //上锁成功结束请求
  break;
  }
  //获取锁失败时,应该在随机延时后进行重试,避免不同客户端同时重试导致谁都无法拿到锁的情况出现
  //睡眠3毫秒后继续请求锁
  Thread.sleep(3, r.nextInt(500));
  }
  } catch (Exception e) {
  e.printStackTrace();
  }
  }
  @Override
  public void unlock() {
  //释放锁
  //不管请求锁是否成功,只要已经上锁,客户端都会进行释放锁的操作
  if (isLocked) {
  redisStringTemplate.delete(key);
  }
  }
  @Override
  public void lockInterruptibly() throws InterruptedException {
  // TODO Auto-generated method stub
  
         
  }
  @Override
  public boolean tryLock() {
  // TODO Auto-generated method stub
  return false;
  }
  @Override
  public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  // TODO Auto-generated method stub
  return false;
  }
  @Override
  public Condition newCondition() {
  // TODO Auto-generated method stub
  return null;
  }
  
}
  

  好了,RedisLock已经实现,我们对Outputer使用RedisLock进行修改
  

/使用RedisLock  

static>
//创建一个名为redisLock的RedisLock类型的锁  RedisLock redisLock = new RedisLock("redisLock");
  public void output(String name) {
  //上锁
  
      redisLock.lock();
  try {
  for(int i=0; i<name.length(); i++) {
  System.out.print(name.charAt(i));
  }
  System.out.println();
  }finally{
  //任何情况下都要释放锁
  
            redisLock.unlock();
  }
  }
  
}
  

  看看使用RedisLock加锁后的的运行结果
  

lingsi  
zhangsan
  
huangwu
  
lingsi
  
zhangsan
  
huangwu
  
lingsi
  
zhangsan
  
huangwu
  
lingsi
  
zhangsan
  
huangwu
  
lingsi
  
zhangsan
  
huangwu
  
......
  

  可见,使用RedisLock加锁后线程之间不再“打架”,三个线程互斥的访问output。

问题

  现在我无法论证RedLock算法在分布式、高并发环境下的可靠性,但从本例三个线程的运行结果看,RedLock算法确实保证了三个线程互斥的访问output(redis.maxIdle=300 redis.maxTotal=600,运行到Timeout waiting for>
参考文档
  http://redis.io/topics/distlock
  http://ifeve.com/redis-lock/
  全部系列到此告一段落了 ,可能很多人觉得还不够 在此我在搜索问题的过程中发现了比较好的 博客spring data+redis
  http://www.cnblogs.com/tankaixiong/p/3660075.html
页: [1]
查看完整版本: 分布式缓存技术redis学习系列(五)——redis实战(redis与spring整合,分布式锁实现)