wcdz 发表于 2016-12-20 10:06:32

Redis客户端-Jedis源代码探索


Redis客户端-Jedis源代码探索



博客分类:
[*]源代码

jedisredisjava



  
Redis的应用已经如火如荼了,你要是搞服务端的,说你还没用过,一些人又要从心里鄙视你n遍了。刚好在项目中实践了,有一点点心得,在这里跟大家交流一下。由于时间的关系,Redis的源码还未读完,今天先把客户端jedis的源码研究一下吧。看完代码后其实你可以自己实现一个了。代码一定要剖析到每一行,吸取精华才算凑效。jedis,在各种客户端中算比较优秀的。代码风格也很好,读来如沐春风,心旷神怡啊:)
  

1.Redis的通信协议


Redis采用自定义的二进制通信协议。有一个基本规范
发送命令规范:
  <参数个数>\r\n

$<参数1字节数>\r\n

<参数1>\r\n

...

$<参数n字节数>\r\n

<参数n>\r\n

响应规范
  

响应类型有返回数据的第一个字节决定的

+代表一个状态信息,+ok

-代表错误

:返回的是一个整数

$返回一个块数据,跟发送命令的规范一样。$<长度>\r\n<数据>\r\n

*返回多个数据块。同上,后面跟着参数的个数和每个参数的数据。

2.jedis的整体架构


解剖jedis架构之前,先自己尝试设计一个客户端。如果用最简单的方式,你会如何设计?

用最简单的Socket就可以实现了,socket跟服务端通信后,就可以按上面的格式发命令了。然后阻塞获得服务端返回的数
  据。听起来很简单吧。
  

实际上jedis也是这样实现的客户端连接数很大,怎么办?每次新建一个socket,开销会成线性增长。我们想到了连接池方案。先新建一批连接,使用时从连接池取一个,用完再回收。连接池实现起来有多复杂?用 apache 的连接池接口,可以简化开发成本。


一台机容量有限,想将数据存储到多个节点。有很多种方案啦。在业务层面,我们可以将不同的业务数据存储在不同的节点(水平分割)。根据用户容量分割到不同的节点(纵向分割)。有没有通用一点的,我们想到的"一致性hash”方案。根据key,定位到目标节点,存储相关的数据。


吹了那么多,其实上面就是jedis的核心了。其他的只是处理各种的命令返回值而已了


3.源代码剖析


连接redis,非常简单,跟传统的socket通信一样。new一个socket,设置相关的参数,connect获取相关的输入输出流进行通信。它的特别之处在于,封装了这些流,方便操作

Java代码






[*]publicvoidconnect(){
[*]
if(!isConnected()){
[*]
try{
[*]
socket=newSocket();
[*]
//->@wjw_add
[*]
socket.setReuseAddress(true);
[*]
socket.setKeepAlive(true);//WillmonitortheTCPconnectionisvalid

[*]
socket.setTcpNoDelay(true);//SocketbufferWhetherclosed,toensuretimelydeliveryofdata

[*]
socket.setSoLinger(true,0);//Controlcallsclose()method,theunderlyingsocketisclosedimmediately

[*]
//<-@wjw_add
[*]
[*]
socket.connect(newInetSocketAddress(host,port),timeout);
[*]socket.setSoTimeout(timeout);
[*]
outputStream=newRedisOutputStream(socket.getOutputStream());
[*]
inputStream=newRedisInputStream(socket.getInputStream());
[*]
}catch(IOExceptionex){
[*]
thrownewJedisConnectionException(ex);
[*]}
[*]}
[*]}



public void connect() {
if (!isConnected()) {
try {
socket = new Socket();
//->@wjw_add
socket.setReuseAddress(true);
socket.setKeepAlive(true);//Will monitor the TCP connection is valid
socket.setTcpNoDelay(true);//Socket buffer Whetherclosed, to ensure timely delivery of data
socket.setSoLinger(true,0);//Control calls close () method, the underlying socket is closed immediately
//<-@wjw_add
socket.connect(new InetSocketAddress(host, port), timeout);
socket.setSoTimeout(timeout);
outputStream = new RedisOutputStream(socket.getOutputStream());
inputStream = new RedisInputStream(socket.getInputStream());
} catch (IOException ex) {
throw new JedisConnectionException(ex);
}
}
}

  

发送命令,就是按照上面的所说的通信协议发送相关的命令
  要注意的是要将字符串转成二进制流

Java代码






[*]privatestaticvoidsendCommand(finalRedisOutputStreamos,
[*]
finalbyte[]command,finalbyte[]...args){
[*]
try{
[*]os.write(ASTERISK_BYTE);
[*]
os.writeIntCrLf(args.length+1);
[*]os.write(DOLLAR_BYTE);
[*]os.writeIntCrLf(command.length);
[*]os.write(command);
[*]os.writeCrLf();
[*]
[*]
for(finalbyte[]arg:args){
[*]os.write(DOLLAR_BYTE);
[*]os.writeIntCrLf(arg.length);
[*]os.write(arg);
[*]os.writeCrLf();
[*]}
[*]
}catch(IOExceptione){
[*]
thrownewJedisConnectionException(e);
[*]}
[*]}



private static void sendCommand(final RedisOutputStream os,
final byte[] command, final byte[]... args) {
try {
os.write(ASTERISK_BYTE);
os.writeIntCrLf(args.length + 1);
os.write(DOLLAR_BYTE);
os.writeIntCrLf(command.length);
os.write(command);
os.writeCrLf();
for (final byte[] arg : args) {
os.write(DOLLAR_BYTE);
os.writeIntCrLf(arg.length);
os.write(arg);
os.writeCrLf();
}
} catch (IOException e) {
throw new JedisConnectionException(e);
}
}

  


处理服务端返回。Redis本身是单线程阻塞的。所以你可以从socket的InputStream中直接读取数据。

获得二进制流,根据不同的命令,将它转成相应的数据类型即可

Java代码






[*]privatestaticObjectprocess(finalRedisInputStreamis){
[*]
try{
[*]
byteb=is.readByte();
[*]
if(b==MINUS_BYTE){
[*]
processError(is);//报错
[*]
}elseif(b==ASTERISK_BYTE){
[*]
returnprocessMultiBulkReply(is);//*多个数据块

[*]
}elseif(b==COLON_BYTE){
[*]
returnprocessInteger(is);//:整数

[*]
}elseif(b==DOLLAR_BYTE){
[*]
returnprocessBulkReply(is);//$一个数据块

[*]
}elseif(b==PLUS_BYTE){
[*]
returnprocessStatusCodeReply(is);//+状态

[*]
}else{
[*]
thrownewJedisConnectionException("Unknownreply:"+(char)b);
[*]}
[*]
}catch(IOExceptione){
[*]
thrownewJedisConnectionException(e);
[*]}
[*]
returnnull;
[*]}



private static Object process(final RedisInputStream is) {
try {
byte b = is.readByte();
if (b == MINUS_BYTE) {
processError(is);//报错
} else if (b == ASTERISK_BYTE) {
return processMultiBulkReply(is);//*多个数据块
} else if (b == COLON_BYTE) {
return processInteger(is);//:整数
} else if (b == DOLLAR_BYTE) {
return processBulkReply(is);//$一个数据块
} else if (b == PLUS_BYTE) {
return processStatusCodeReply(is);//+状态
} else {
throw new JedisConnectionException("Unknown reply: " + (char) b);
}
} catch (IOException e) {
throw new JedisConnectionException(e);
}
return null;
}

  


连接池的实现。现在要实现连接池,一般都是用apache的pool接口来实现了。只需要两个步骤。

1.实现Config 就是连接池的配置参数,包括最大连接数,初始连接数,空闲时间.

2.实现 BasePoolableObjectFactory,提供新建连接,废弃连接,测试连接方法

Java代码






[*]publicObjectmakeObject()throwsException{//产生redis实例

[*]
finalJedisjedis=newJedis(this.host,this.port,this.timeout);

[*]
[*]jedis.connect();
[*]
if(null!=this.password){
[*]
jedis.auth(this.password);
[*]}
[*]
if(database!=0){
[*]jedis.select(database);
[*]}
[*]
[*]
returnjedis;
[*]}
[*]
[*]
publicvoiddestroyObject(finalObjectobj)throwsException{//销毁jedis实例

[*]
if(objinstanceofJedis){
[*]
finalJedisjedis=(Jedis)obj;
[*]
if(jedis.isConnected()){
[*]
try{
[*]
try{
[*]jedis.quit();
[*]
}catch(Exceptione){
[*]}
[*]jedis.disconnect();
[*]
}catch(Exceptione){
[*]
[*]}
[*]}
[*]}
[*]}
[*]
[*]
publicbooleanvalidateObject(finalObjectobj){//验证jedis是否有效,简单的ping命令

[*]
if(objinstanceofJedis){
[*]
finalJedisjedis=(Jedis)obj;
[*]
try{
[*]
returnjedis.isConnected()&&jedis.ping().equals("PONG");
[*]
}catch(finalExceptione){
[*]
returnfalse;
[*]}
[*]
}else{
[*]
returnfalse;
[*]}
[*]}



public Object makeObject() throws Exception {//产生redis实例
final Jedis jedis = new Jedis(this.host, this.port, this.timeout);
jedis.connect();
if (null != this.password) {
jedis.auth(this.password);
}
if( database != 0 ) {
jedis.select(database);
}
return jedis;
}
public void destroyObject(final Object obj) throws Exception {//销毁jedis实例
if (obj instanceof Jedis) {
final Jedis jedis = (Jedis) obj;
if (jedis.isConnected()) {
try {
try {
jedis.quit();
} catch (Exception e) {
}
jedis.disconnect();
} catch (Exception e) {
}
}
}
}
public boolean validateObject(final Object obj) {//验证jedis是否有效,简单的ping命令
if (obj instanceof Jedis) {
final Jedis jedis = (Jedis) obj;
try {
return jedis.isConnected() && jedis.ping().equals("PONG");
} catch (final Exception e) {
return false;
}
} else {
return false;
}
}

  


一致性hash的实现。关键是如何使得key均匀散列。jedis使用了murmurhash 2.0

Java代码






[*]publicRgetShard(Stringkey){
[*]
returnresources.get(getShardInfo(key));
[*]}
[*]
[*]
publicSgetShardInfo(byte[]key){
[*]
SortedMap<Long,S>tail=nodes.tailMap(algo.hash(key));//找到key所在的节点

[*]
if(tail.size()==0){
[*]
returnnodes.get(nodes.firstKey());
[*]}
[*]
returntail.get(tail.firstKey());
[*]}



public R getShard(String key) {
return resources.get(getShardInfo(key));
}
public S getShardInfo(byte[] key) {
SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));//找到key所在的节点
if (tail.size() == 0) {
return nodes.get(nodes.firstKey());
}
return tail.get(tail.firstKey());
}

  
4.经验教训
  

这里谈谈实践中遇到的几个问题,都是血和泪的教训啊。
  

1.jedis的连接池获得的连接,进行通信时候出错了,一定要记得销毁该连接。因为它的inputstream里面可能还残留数据。下次从连接池获得的时候都是dirty data了。一般采用以下的方案:

Java代码






[*]try{
[*]
//dosomething
[*]
pool.returnConnection;//返回正常连接
[*]
catch(Exceptione){
[*]
pool.returnBrokenConnection;//销毁连接





try{
//do something
pool.returnConnection;//返回正常连接
}catch(Exception e){
pool.returnBrokenConnection;//销毁连接
}

  

2.使用一致性hash的时候,使用批量查询命令mget的时候,ShardedJedis本身不支持的,只能用一个个key去取数据,性能低下。有一个比较土的办法。先将key对应的节点分类合并,然后单独用mget去获取数据,再将返回值合并给用户。可以显著减少网络连接。
  
5.总结
  

jedis简单,高效,本身代码也可以当做一个网络客户端的典型实现范例。
页: [1]
查看完整版本: Redis客户端-Jedis源代码探索