设为首页 收藏本站
查看: 660|回复: 0

[经验分享] C# Redis客户端对一致性hash算法的实现

[复制链接]

尚未签到

发表于 2015-11-12 12:53:40 | 显示全部楼层 |阅读模式
  这里介绍的是C#的redis客户端 ServiceStack 对一致性hash算法的实现.主要涉及到类如下:
  

ShardedConnectionPool
ShardedRedisClientManager
而ShardedConnectionPool又是继承自PooledRedisClientManager
ConsistentHash

  关于一致性hash算法的用处和介绍,请看我以前的blog
  http://blog.iyunv.com/fenglvming/article/details/8892228
  


  一致性hash的思路很简单:也就是,它将所有的可读写的redis服务器的ip,hash到了一个”环形列表”.在C#中是用SortedMap(查找的时候采用二分查找来获取值)来保存这些ip的hash值.Java的jedis客户端使用到了TreeMap(Java对红黑树的实现). 两者的查找的时间复杂度差不多,都可以看成是lgn.当要保存一个对象或者获取一个对象的时候,通过这个对象的key得到的hash值顺时针找到的第一个redis即为要存放或获取数据的目的或来源.我们先看看ShardedRedisClientManager的构造函数:

public ShardedRedisClientManager(params ShardedConnectionPool[] connectionPools)
{
if (connectionPools == null)
throw new ArgumentNullException("connection pools can not be null.");
List<KeyValuePair> pools = new List<KeyValuePair>();
foreach (var connectionPool in connectionPools)
{pools.Add(new KeyValuePair(connectionPool, connectionPool.weight));
}
_consistentHash = new ConsistentHash(pools);
}
看看它怎么对ShardedConnectionPool进行一致性hash的.public ConsistentHash(IEnumerable<KeyValuePair> nodes, Func hashFunction)
{
if (hashFunction != null)
_hashFunction = hashFunction;//内置的MD5算法
foreach (var node in nodes)
AddTarget(node.Key, node.Value);
}
他将每一个redis节点都放置到通过AddTarget方法,加入到了SortedMap中.其中weight代表的是该redisPool的权重,replicas就好像是一致性hash中的虚拟节点的概念一样,也就是说1个redis实例,可能对应着weight*replicas个hash&#20540;.这样有助于数据的分散和均匀.public void AddTarget(T node, int weight)
{
int repeat = weight > 0 ? weight * Replicas : Replicas;
for (int i = 0; i < repeat; i++)
{
string identifier = node.GetHashCode().ToString() + &quot;-&quot; + i;
ulong hashCode = _hashFunction(identifier);
_circle.Add(hashCode, node);
}
}
好了,这样我们就把我们的redis(如果权重都为1的话)和hash&#20540;做了一个一一对应.,每个redispool有replicas个hash&#20540;.下面我们来看看简单的获取流程.我们要通过redis获取&#20540;的话,首先我们要获得一个IRedisClient的对象来操作redis.如果我们用的是ShardedRedisClientManager的话,我们先获取到redispool,然后通过pool获得client.获取一个POOL的方法如下,还是在ConsistentHash类中:public T GetTarget(string key)
{
ulong hash = _hashFunction(key);
ulong firstNode = ModifiedBinarySearch(_circle.Keys.ToArray(), hash);
return _circle[firstNode];
}
ModifiedBinarySearch
就是一个二分查找.但是和普通的二分查找意义不一样的是,普通的二分查找是找到一个与给定的&#20540;相等的&#20540;,返回其下标.这个二分查找是到一个下标,是传入的&#20540;顺时针遇到的第一个&#20540;.具体可以看代码:public static ulong ModifiedBinarySearch(ulong[] sortedArray, ulong val)
{
int min = 0;
int max = sortedArray.Length - 1;
if (val < sortedArray[min] || val > sortedArray[max])
return sortedArray[0];
while (max - min > 1)
{    int mid = (max + min) / 2;
if (sortedArray[mid] >= val)
{max = mid; }else{min = mid; }
}
return sortedArray[max];
}

  以上,redis的C#客户端的一致性hash算法的实现就大致如此了.
  但是如果某个pool一旦不可达,它可不知道是否可达,所以它依然还是根据key找到了一个pool并返回一个client.
  我做的修改是启动一个线程,定时去查看这些pool是否可达,如果不可达,就将其从环中删去.
  并记录.同时,还会去判断那些不可达的pool是否可达了,如果变成可达了,那么就重新加入到环中.以下是ShardedRedisClientManager中新增的方法:
  


  

private List<KeyValuePair<ShardedConnectionPool, int>> removed
= new List<KeyValuePair<ShardedConnectionPool, int>>();
public void ValidShardingPool(List<KeyValuePair<ShardedConnectionPool, int>> pools)
{
var newpools = pools;
Thread t = new Thread(Func => {
while(true){
checkValid(newpools);
Thread.Sleep(60000);
checkInvalid();
Thread.Sleep(60000);
}
});
t.Start();
}
private void checkValid(List<KeyValuePair<ShardedConnectionPool, int>> pools)
{
foreach (var pool in pools)
{
var writeclient = pool.Key.GetClient();
var readclient = pool.Key.GetReadOnlyClient();
var nwclient = writeclient as RedisNativeClient;
var nrclient = readclient as RedisNativeClient;
bool needRemove = false;
try
{
if (!nwclient.Ping())
{
if (!nrclient.Ping())
{
needRemove = true;
}
}
}
catch (Exception e)
{
needRemove = true;
}
if (needRemove)
{
removed.Add(pool);
lock (_consistentHash)
{
_consistentHash.RemoveTarget(pool.Key, pool.Value);
}
}
}
}

public void checkInvalid() {
List<KeyValuePair<ShardedConnectionPool, int>> addBack =
new List<KeyValuePair<ShardedConnectionPool, int>>();
foreach (var pool in removed)
{
var writeclient = pool.Key.GetClient();
var readclient = pool.Key.GetReadOnlyClient();
var nwclient = writeclient as RedisNativeClient;
var nrclient = readclient as RedisNativeClient;
if (nwclient.Ping())
{
if (nrclient.Ping())
{
lock (_consistentHash)
{
addBack.Add(pool);
_consistentHash.AddTarget(pool.Key, pool.Value);
}
}
}
}
foreach (var pool in addBack)
{
removed.Remove(pool);
}
}


  
  


  同时在ConsistendHash中增加一个方法:
  

        public int GetRepeat(int weight)
{
return weight > 0 ? weight * Replicas : Replicas;
}
public string GetIdentifier(T node,int index) {
return node.GetHashCode().ToString() + &quot;-&quot; + index;
}
public ulong GetHashCode(string identifier) {
return _hashFunction(identifier);
}

public void RemoveTarget(T node ,int weight) {
// increase the replicas of the node by weight
int repeat = GetRepeat(weight);
for (int i = 0; i < repeat; i++)
{
string identifier = GetIdentifier(node, i);
ulong hashCode = GetHashCode(identifier);
if(_circle.ContainsKey(hashCode)){
_circle.Remove(hashCode);
}
}
}
  不过我感觉这种方式不是正确处理redis服务器宕机的正确路子.
  也希望得到大家的看法.如果让redis服务器宕机的影响到最小.
  
  


  下面说说我对PooledRedisClientManager的一些修改.这个里面有两个属性,一个写客户端数组和一个读客户端数组.如果在默认情况下,这两个数组大小为10.

private RedisClient[] writeClients = new RedisClient[0];
private RedisClient[] readClients = new RedisClient[0];
我们有时候会遇到如下情况,如果某个redis新手的话,他会在一个for循环中不断的获取client.这样很容易就耗尽了这两个数组,并且这两个数组中的client都还在使用状态.那么会导致在循环到一定程序,获取到的client为null.我做了如下修改,引入了两个新的锁 变量来替换原有锁住上述两个数组对象的地方.        private object writeClientsLock = new object();
private object readClientsLock = new object();
protected int RedisMaxConnection = 320;
然后改进了GetInActiveWriteClient和GetInActiveReadClient.在后面加入了:   //do something to extends the capacity of the writes client;
//extends two times of the original size
int originalLength = writeClients.Length;
if (originalLength * 2 < RedisMaxConnection)
{
int length = writeClients.Length << 1;
var newwriteClients = new RedisClient[length];
Array.ConstrainedCopy(writeClients, 0, newwriteClients,
0, writeClients.Length);
writeClients = newwriteClients;
var ret = GetInActiveWriteClient();
return ret;
}
一旦发现writeClients数组长度没有达到320,那么就扩展其长度.
版权声明:本文为博主原创文章,未经博主允许不得转载。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-138336-1-1.html 上篇帖子: Redis的Keyspace notifications功能初探 下篇帖子: 把 MongoDB 当成是纯内存数据库来使用(Redis 风格)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表