Redis-Cluster实战--6.Jedis实战
转载请注明出处哈:http://carlosfu.iyunv.com/blog/2240426
通过jedis来连接操作redis总体来说比较简单,按照redis单机、redis-sentinel、redis-cluster略有不同。
一、Jedis相关依赖
1. jedis依赖(选择最新的稳定版本,支持redis-cluster)
<jedis.version>2.7.2</jedis.version>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
2. logback和junit依赖
<logback.version>1.0.13</logback.version>
<junit.version>4.11</junit.version>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
二、Jedis单机测试:
1. Jedis-简单Kv
Jedis jedis = new Jedis("127.0.0.1");
jedis.set("foo", "bar");
String value = jedis.get("foo");
建议所有的jedis都放在try catch finally(jedis.close操作)中
package com.sohu.tv.test.jedis;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
/**
* 第一个jedis测试
*
* @author leifu
* @Date 2015年8月24日
* @Time 下午1:35:26
*/
public class JedisFirstTest {
private Logger logger = LoggerFactory.getLogger(JedisFirstTest.class);
/**
* redis单机host
*/
private final static String JEDIS_HOST = "127.0.0.1";
/**
* redis单机port
*/
private final static int JEDIS_PORT = 6379;
/**
* 超时时间(毫秒)
*/
private final static int JEDIS_TIME_OUT = 300;
@Test
public void testJedis() {
Jedis jedis = null;
try {
jedis = new Jedis(JEDIS_HOST, JEDIS_PORT, JEDIS_TIME_OUT);
String key = "sohuKey";
jedis.set(key, "sohuValue");
String value = jedis.get(key);
logger.info("get key {} from redis, value is {}", key, value);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
2. Jedis-序列化Kv:
我们使用protostuff(Protostuff是基于大名鼎鼎的Google protobuff技术的Java版本)作为序列化工具:
(1)pom依赖:
<protostuff.version>1.0.8</protostuff.version>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>${protostuff.version}</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>${protostuff.version}</version>
</dependency>
(2)Club实体类:
package com.sohu.tv.bean;
import java.io.Serializable;
import java.util.Date;
/**
* 俱乐部
*
* @author leifu
* @Date 2015年7月28日
* @Time 下午1:43:53
*/
public class Club implements Serializable {
/**
* 俱乐部id
*/
private int id;
/**
* 俱乐部名
*/
private String clubName;
/**
* 俱乐部描述
*/
private String clubInfo;
/**
* 创建日期
*/
private Date createDate;
/**
* 排名
*/
private int rank;
public Club(int id, String clubName, String clubInfo, Date createDate, int rank) {
super();
this.id = id;
this.clubName = clubName;
this.clubInfo = clubInfo;
this.createDate = createDate;
this.rank = rank;
}
public Club() {
super();
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getClubName() {
return clubName;
}
public void setClubName(String clubName) {
this.clubName = clubName;
}
public String getClubInfo() {
return clubInfo;
}
public void setClubInfo(String clubInfo) {
this.clubInfo = clubInfo;
}
public Date getCreateDate() {
return createDate;
}
public void setCreateDate(Date createDate) {
this.createDate = createDate;
}
public int getRank() {
return rank;
}
public void setRank(int rank) {
this.rank = rank;
}
@Override
public String toString() {
return "Club [id=" + id + ", clubName=" + clubName + ", clubInfo=" + clubInfo + ", createDate=" + createDate
+ ", rank=" + rank + "]";
}
}
(3)序列化工具:
package com.sohu.tv.serializer;
import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;
import java.util.concurrent.ConcurrentHashMap;
/**
* protostuff序列化工具
*
* @author leifu
* @Date 2015-8-22
* @Time 上午10:05:20
*/
public class ProtostuffSerializer {
private static ConcurrentHashMap<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();
public <T> byte[] serialize(final T source) {
VO<T> vo = new VO<T>(source);
final LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
final Schema<VO> schema = getSchema(VO.class);
return serializeInternal(vo, schema, buffer);
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
public <T> T deserialize(final byte[] bytes) {
try {
Schema<VO> schema = getSchema(VO.class);
VO vo = deserializeInternal(bytes, schema.newMessage(), schema);
if (vo != null && vo.getValue() != null) {
return (T) vo.getValue();
}
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
return null;
}
private <T> byte[] serializeInternal(final T source, final Schema<T> schema, final LinkedBuffer buffer) {
return ProtostuffIOUtil.toByteArray(source, schema, buffer);
}
private <T> T deserializeInternal(final byte[] bytes, final T result, final Schema<T> schema) {
ProtostuffIOUtil.mergeFrom(bytes, result, schema);
return result;
}
private static <T> Schema<T> getSchema(Class<T> clazz) {
@SuppressWarnings("unchecked")
Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
if (schema == null) {
schema = RuntimeSchema.createFrom(clazz);
cachedSchema.put(clazz, schema);
}
return schema;
}
}
package com.sohu.tv.serializer;
import java.io.Serializable;
/**
* @author leifu
* @Date 2015-8-22
* @Time 上午10:05:44
* @param <T>
*/
public class VO<T> implements Serializable {
private T value;
public VO(T value) {
this.value = value;
}
public VO() {
}
public T getValue() {
return value;
}
@Override
public String toString() {
return "VO{" +
"value=" + value +
'}';
}
}
(4)测试代码:
@Test
public void testJedisSerializable() {
ProtostuffSerializer protostuffSerializer = new ProtostuffSerializer();
Jedis jedis = null;
try {
jedis = new Jedis(JEDIS_HOST, JEDIS_PORT, JEDIS_TIME_OUT);
String key = "sohuKeySerializable";
// 序列化
Club club = new Club(1, "AC", "米兰", new Date(), 1);
byte[] clubBtyes = protostuffSerializer.serialize(club);
jedis.set(key.getBytes(), clubBtyes);
// 反序列化
byte[] resultBtyes = jedis.get(key.getBytes());
Club resultClub = protostuffSerializer.deserialize(resultBtyes);
logger.info("get key {} from redis, value is {}", key, resultClub);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
if (jedis != null) {
jedis.close();
}
}
}
(5)测试结果:
09:53:18.529 INFO c.sohu.tv.test.jedis.JedisFirstTest - get key sohuKeySerializable from redis, value is Club
3. 连接池(推荐使用方式):一般线上系统的连接资源都是通过资源池的形式进行管理的。
package com.sohu.tv.test.jedis;
import java.util.Date;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sohu.tv.serializer.ProtostuffSerializer;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* 第一个jedisPool测试
*
* @author leifu
* @Date 2015年8月24日
* @Time 下午1:35:26
*/
public class JedisPoolTest {
private Logger logger = LoggerFactory.getLogger(JedisPoolTest.class);
private static JedisPool jedisPool;
/**
* redis单机host
*/
private final static String REDIS_HOST = "127.0.0.1";
/**
* redis单机port
*/
private final static int REDIS_PORT = 6379;
/**
* 超时时间(毫秒)
*/
private final static int JEDIS_POOL_TIME_OUT = 1000;
@BeforeClass
public static void testBeforeClass() {
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(GenericObjectPoolConfig.DEFAULT_MAX_TOTAL * 5);
poolConfig.setMaxIdle(GenericObjectPoolConfig.DEFAULT_MAX_IDLE * 3);
poolConfig.setMinIdle(GenericObjectPoolConfig.DEFAULT_MIN_IDLE * 2);
poolConfig.setJmxEnabled(true);
poolConfig.setMaxWaitMillis(3000);
jedisPool = new JedisPool(poolConfig, REDIS_HOST, REDIS_PORT, JEDIS_POOL_TIME_OUT);
}
@AfterClass
public static void testAfterClass() {
if (jedisPool != null) {
jedisPool.destroy();
}
}
@Test
public void testJedisPool() {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
String key = "sohuKeyPool";
jedis.set(key, "sohuValue");
String value = jedis.get(key);
logger.info("get key {} from redis, value is {}", key, value);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
if (jedis != null) {
// 如果使用JedisPool,close操作不是关闭连接,代表归还资源池
jedis.close();
}
}
}
}
(3.1) 注意:jedis.close的实现:
(1) dataSource!=null代表使用的是连接池,所以jedis.close()代表归还连接给连接池
(2) dataSource=null代表直连,jedis.close代表关闭连接
(3) jedis.close放到finally里面做
Jedis源码中:
@Override
public void close() {
if (dataSource != null) {
if (client.isBroken()) {
this.dataSource.returnBrokenResource(this);
} else {
this.dataSource.returnResource(this);
}
} else {
client.close();
}
}
(3.2) GenericObjectPoolConfig参数说明如下:
[*]maxActive: 链接池中最大连接数,默认为8. (并非越大越好,具体原因可以参考GenericObjectPool的实现)
[*]maxIdle: 链接池中最大空闲的连接数,默认为8.
[*]minIdle: 连接池中最少空闲的连接数,默认为0.
[*]maxWait: 当连接池资源耗尽时,调用者最大阻塞的时间,超时将跑出异常。单位,毫秒数;默认为-1.表示永不超时.
[*]jmxEnabled: 当设置为true, 且服务开启的jmx服务时,使用jconsole, jvisualvm等工具将看到如下关于连接池的很全面的统计,这些统计结果有助于优化自己的配置。
其余配置如下:
[*]minEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲连接将可能会被移除。负值(-1)表示不移除。默认-1
#这个也挺重要,一般不用移除,每次makeObject()开销比较大
2. numTestsPerEvictionRun: 对于“空闲链接”检测线程而言,每次检测的链接资源的个数。默认为3
3. testOnBorrow: 向调用者输出“链接”资源时,是否检测是有有效,如果无效则从连接池中移除,并尝试获取继续获取。默认为false
#借资源时候是否要验证,比如jedis对象验证是ip:port是否发生改变,且执行一个ping命令
4. testOnReturn: 向连接池“归还”链接时,是否检测“链接”对象的有效性。默认为false。
#还资源时候是否要验证,同上。
5. testWhileIdle: 向调用者输出“链接”对象时,是否检测它的空闲超时;如果“链接”空闲超时,将会被移除。默认为false
#空闲是否是否要定时检查一遍所有的对象
6. timeBetweenEvictionRunsMillis: “空闲链接”检测线程,检测的周期,毫秒数。如果为负值,表示不运行“检测线程”。默认为-1.
#9-11 的配置不是绝对的,关键要看对象是否足够健壮(比如是否有重连等机制)
7. whenExhaustedAction: 当“连接池”中active数量达到阀值时,即“链接”资源耗尽时,连接池需要采取的手段, 默认为1:
-> 0 : 抛出异常,
-> 1 : 阻塞,直到有可用链接资源
-> 2 : 强制创建新的链接资源
上述(1-7)通常不需要设置,使用默认值即可。
四、Redis-Sentinel
JedisSentinelPool sentinelPool = new JedisSentinelPool(masterName, sentinelSet, poolConfig, timeout);
//获取jedis的方法和JedisPool一样的,不在赘述
masterName: sentinel配置中,监控主服务器的名称
sentinelSet: sentinel实例列表
poolConfig: common-pool包中的GenericObjectPoolConfig
timeout: 超时
有一点需要注意的是:sentinelSet: sentinel实例列表,而不是具体的redis实例列表,这是因为为了实现高可用,jedis屏蔽了redis实例信息,所有实例信息(主从信息)都是通过sentinel获取。
五、Redis-Cluster
Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
jedisClusterNodes.add(new HostAndPort("127.0.0.1", 7379));
....
PipelineCluster pipelineCluster = new PipelineCluster(jedisPoolConfig, nodeList, timeout);
//获取jedis的方法和JedisPool一样的,不在赘述
nodeList: redis实例列表
jedisPoolConfig: common-pool包中的GenericObjectPoolConfig
timeout: 超时时间
有一点需要注意的是:nodeList尽可能写入所有的redis实例信息(虽然jedis可以从任一redis实例获取到集群的信息。)
有兴趣的可以一下jedis源码中JedisClusterConnectionHandler这个类的initializeSlotsCache方法:
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig) {
for (HostAndPort hostAndPort : startNodes) {
Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
try {
cache.discoverClusterNodesAndSlots(jedis);
break;
} catch (JedisConnectionException e) {
// try next nodes
} finally {
if (jedis != null) {
jedis.close();
}
}
}
for (HostAndPort node : startNodes) {
cache.setNodeIfNotExist(node);
}
}
附一个redis-cluster工厂类:
package com.sohu.tv.common;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.PipelineCluster;
/**
* redis-cluster java客户端工具类(单例)
*
* @author leifu
* @Date 2015-8-30
* @Time 上午10:08:12
*/
public class RedisClusterComponent {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private static RedisClusterComponent redisClusterComponent = new RedisClusterComponent();
private static final String HOST = "127.0.0.1";
/**
* redisCluster客户端
*/
private PipelineCluster redisCluster;
/**
* 客户端超时时间
*/
private final static int TIME_OUT = 5;
private RedisClusterComponent() {
// common-pool配置
GenericObjectPoolConfig poolConfig = getCommonPoolConfig();
try {
// redis节点信息
Set<HostAndPort> nodeList = new HashSet<HostAndPort>();
nodeList.add(new HostAndPort(HOST, 8000));
nodeList.add(new HostAndPort(HOST, 8001));
nodeList.add(new HostAndPort(HOST, 8002));
nodeList.add(new HostAndPort(HOST, 8003));
nodeList.add(new HostAndPort(HOST, 8004));
nodeList.add(new HostAndPort(HOST, 8005));
redisCluster = new PipelineCluster(poolConfig, nodeList, TIME_OUT);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
/**
* 生成默认的common-pool配置
*
* @return
*/
public static GenericObjectPoolConfig getCommonPoolConfig() {
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(GenericObjectPoolConfig.DEFAULT_MAX_TOTAL * 10);
poolConfig.setMaxIdle(GenericObjectPoolConfig.DEFAULT_MAX_IDLE * 5);
poolConfig.setMinIdle(GenericObjectPoolConfig.DEFAULT_MAX_IDLE * 2);
// JedisPool.borrowObject最大等待时间
poolConfig.setMaxWaitMillis(1000L);
// 开启jmx
poolConfig.setJmxEnabled(true);
return poolConfig;
}
public static RedisClusterComponent getInstance() {
return redisClusterComponent;
}
public void destroy() {
if (redisCluster != null) {
redisCluster.close();
}
}
public PipelineCluster getRedisCluster() {
return redisCluster;
}
public static void main(String[] args) {
System.out.println(RedisClusterComponent.getInstance().getRedisCluster().set("testa", "b"));
}
}
六、总结和经验:
总体来说,通过jedis来连接操作redis是比较简单,只是按照redis单机、redis-sentinel、redis-cluster略有不同,但是有几点还是共通和需要注意的:
(1) 无论哪种类型的redis最终获取数据都是通过jedis从一个redis实例获取的。
(2) try catch finally操作是有必要的。(做异常梳理,关闭资源的)
(3) jedis.close()的实现方式针对不同类型的redis有很大不同(上面已经提过了)
(4) jedis依赖了common-pool,有关common-pool的参数需要根据不同的使用场景,各不相同,需要具体问题具体分析。
(5) jedis没有提供序列化功能(xmemcached, ehcache都有), 开发者可以根据自己的需求选取合适的序列化方式(附赠一篇序列化不错的文章:http://www.infoq.com/cn/articles/serialization-and-deserialization)
页:
[1]