Redis内部数据结构详解之跳跃表(skiplist)
一、跳跃表简介跳跃表是一种随机化数据结构,基于并联的链表,其效率可以比拟平衡二叉树,查找、删除、插入等操作都可以在对数期望时间内完成,对比平衡树,跳跃表的实现要简单直观很多。
以下是一个跳跃表的例图(来自维基百科):
从图中可以看出跳跃表主要有以下几个部分构成:
1、表头head:负责维护跳跃表的节点指针
2、节点node:实际保存元素值,每个节点有一层或多层
3、层level:保存着指向该层下一个节点的指针
4、表尾tail:全部由null组成
跳跃表的遍历总是从高层开始,然后随着元素值范围的缩小,慢慢降低到低层。
二、Redis中跳跃表的数据结构
Redis作者为了适合自己功能的需要,对原来的跳跃表进行了一下修改:
1、允许重复的score值:多个不同的元素(member)的score值可以相同
2、进行元素对比的时候,不仅要检查score值,还需要检查member:当score值相等时,需要比较member域进行比较。
3、结构保存一个tail指针:跳跃表的表尾指针
4、每个节点都有一个高度为1层的前驱指针,用于从底层表尾向表头方向遍历
跳跃表数据结构如下(redis.h):
typedef struct zskiplistNode {
robj *obj; //节点数据
double score;
struct zskiplistNode*backward; //前驱
struct zskiplistLevel {
struct zskiplistNode*forward;//后继
unsigned int span;//该层跨越的节点数量
} level[];
} zskiplistNode;
typedef struct zskiplist {
struct zskiplistNode*header, *tail;
unsigned long length;//节点的数目
int level;//目前表的最大层数
} zskiplist;
数据结构中可能span这个参数最不好理解了,下面简单解释一下:
参照上面跳跃表的例图,head节点的span所有level的值都将是1;node 1在level 0 span值为1,因为跨越1个元素都将走到下一个节点2,在level 1 span值为2,因为需要跨越2个元素(node 2,node 3)才能到达下一个节点3。
关于span的解释可以详看:
http://stackoverflow.com/questions/10458572/what-does-the-skiplistnode-variable-span-mean-in-redis-h
三、简单列出跳跃表的API,他们的作用以及算法复杂度:
约定O(N)表示对于元素个数的表达,O(L)表示对于跳表层数的表达
函数名作用复杂度
zslCreateNode新建并返回一个跳表节点O(1)
zslCreate新建并初始化一个跳跃表O(L)
zslFreeNode释放给定的节点O(1)
zslFree释放给定的跳跃表O(N)
zslRandomLevel得到新节点的层数(抛硬币法的改进)O(1)
zslInsert将给定的score与member新建节点并添加到跳表中O(logN)
zslDeleteNode删除给定的跳表节点O(L)
zslDelete删除给定的score与member在跳表中匹配的节点O(logN)
zslIsInRange检查跳表中的元素score值是否在给定的范围内O(1)
zslFirstInRange查找第一个符合给定范围的节点O(logN)
zslLastInRange查找最后一个符合给定范围的节点O(logN)
zslDeleteRangeByScore删除score值在给定范围内的节点O(logN)+O(M)
zslDeleteRangeByRank删除排名在给定范围内的节点O(logN)+O(M)
zslGetRank返回给定score与member在集合中的排名O(logN)
zslGetElementByRank根据给定的rank来查找元素O(logN)
四、上述API源码的简单解析
4.1 zslCreateNode
//建立一个skiplist节点,需要传入所在的level,score,以及保存的数值obj
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->obj = obj;
return zn;
}
zmalloc是Redis在系统函数malloc上自己封装的函数,主要为了方便对内存使用情况的计算。
4.2zslCreate
//创建skiplist,header不存储任何数据
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);//<span style="font-family: Arial, Helvetica, sans-serif;">ZSKIPLIST_MAXLEVEL = </span>32
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level.forward = NULL;//后继
zsl->header->level.span = 0;
}
zsl->header->backward = NULL;//前驱
zsl->tail = NULL;//尾指针
return zsl;
}
4.3 zslRandomLevel
int zslRandomLevel(void) {//为新的skiplist节点生成该节点level数目
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))//0.25
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
4.4 zslInsert
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update, *x;
unsigned int rank;
int i, level;
redisAssert(!isnan(score));
x = zsl->header;//header不存储数据
//从高向低
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
//rank用来记录第i层达到插入位置的所跨越的节点总数,也就是该层最接近(小于)给定score的排名
//rank初始化为上一层所跨越的节点总数
rank = i == (zsl->level-1) ? 0 : rank;
//后继节点不为空,并且后继节点的score比给定的score小
while (x->level.forward &&
(x->level.forward->score < score ||
//score相同,但节点的obj比给定的obj小
(x->level.forward->score == score &&
compareStringObjects(x->level.forward->obj,obj) < 0))) {
rank += x->level.span;//记录跨越了多少个节点
x = x->level.forward;//继续向右走
}
update = x;//保存访问的节点,并且将当前x移动到下一层
}
/* we assume the key is not already inside, since we allow duplicated
* scores, and the re-insertion of score and redis object should never
* happen since the caller of zslInsert() should test in the hash table
* if the element is already inside or not. */
level = zslRandomLevel();//计算新的level
if (level > zsl->level) {//新的level > zsl->level,需要进行升级
for (i = zsl->level; i < level; i++) {
rank = 0;
update = zsl->header;//需要更新的节点就是header
update->level.span = zsl->length;
//在未添加新节点之前,需要更新的节点跨越的节点数目自然就是zsl->length,
}
zsl->level = level;
}
x = zslCreateNode(level,score,obj);//建立新节点
//开始插入节点
for (i = 0; i < level; i++) {
//新节点的后继就是插入位置节点的后继
x->level.forward = update->level.forward;
//插入位置节点的后继就是新节点
update->level.forward = x;
/* update span covered by update as x is inserted here */
/**
rank: 在第i层,update->score的排名
rank - rank: update->score与update->score之间间隔了几个数,即span数目
对于update->level.span值的更新由于在update与update->level->forward之间又添加了x,
update->level.span = 从update到x的span数目,
由于update后面肯定是新添加的x,所以自然新的update->level.span = (rank - rank) + 1;
x->level.span = 从x到update->forword的span数目,
原来的update->level.span = 从update到update->level->forward的span数目
所以x->level.span = 原来的update->level.span - (rank - rank);
另外需要注意当level > zsl->level时,update = zsl->header的span处理
*/
x->level.span = update->level.span - (rank - rank);
update->level.span = (rank - rank) + 1;
}
/* increment span for untouched levels */
//如果新节点的level小于原来skiplist的level,那么在上层没有insert新节点的span需要加1
for (i = level; i < zsl->level; i++) {
update->level.span++;
}
x->backward = (update == zsl->header) ? NULL : update;//前驱指针
if (x->level.forward)
x->level.forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
4.5 zslDeleteNode
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update->level.forward == x) {//update->level的后继等于要删除节点x
//原来的update->level.span == 1,所以新的span很好计算,就应该直接等于x->level.span
update->level.span += x->level.span - 1;
update->level.forward = x->level.forward;
} else {
update->level.span -= 1;
}
}
if (x->level.forward) {//处理前驱节点
x->level.forward->backward = x->backward;
} else {//否则,更新tail
zsl->tail = x->backward;
}
//收缩level
while(zsl->level > 1 && zsl->header->level.forward == NULL)
zsl->level--;
zsl->length--;
}
4.6 zslDelete
/* Delete an element with matching score/object from the skiplist. */
//根据score, obj来删除节点
int zslDelete(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update, *x;
int i;
x = zsl->header;
// 遍历所有层,记录删除节点后需要被修改的节点到 update 数组
for (i = zsl->level-1; i >= 0; i--) {
while (x->level.forward &&
(x->level.forward->score < score ||
(x->level.forward->score == score &&
compareStringObjects(x->level.forward->obj,obj) < 0)))
x = x->level.forward;
update = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level.forward;
// 因为多个不同的 member 可能有相同的 score
// 所以要确保 x 的 member 和 score 都匹配时,才进行删除
if (x && score == x->score && equalStringObjects(x->obj,obj)) {
zslDeleteNode(zsl, x, update);
zslFreeNode(x);
return 1;
} else {
return 0; /* not found */
}
return 0; /* not found */
}
4.8 zslFirstInRange
/* Find the first node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
//找到跳跃表中第一个符合给定范围的元素
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
zskiplistNode *x;
int i;
/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,&range)) return NULL;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* Go forward while *OUT* of range. */
while (x->level.forward &&
!zslValueGteMin(x->level.forward->score,&range))
x = x->level.forward;
}
/* This is an inner range, so the next node cannot be NULL. */
x = x->level.forward;
redisAssert(x != NULL);
/* Check if score <= max. */
if (!zslValueLteMax(x->score,&range)) return NULL;
return x;
}
4.9 zslLastInRange
/* Find the last node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
//找到跳跃表中最后一个符合给定范围的元素
zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
zskiplistNode *x;
int i;
/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,&range)) return NULL;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* Go forward while *IN* range. */
while (x->level.forward &&
zslValueLteMax(x->level.forward->score,&range))
x = x->level.forward;
}
/* This is an inner range, so this node cannot be NULL. */
redisAssert(x != NULL);
/* Check if score >= min. */
if (!zslValueGteMin(x->score,&range)) return NULL;
return x;
}
4.10 zslDeleteRangeByScore
/* Delete all the elements with score between min and max from the skiplist.
* Min and max are inclusive, so a score >= min || score <= max is deleted.
* Note that this function takes the reference to the hash table view of the
* sorted set, in order to remove the elements from the hash table too. */
//删除给定范围内的 score 的元素。
unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
zskiplistNode *update, *x;
unsigned long removed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level.forward && (range.minex ?//是否是闭区间
x->level.forward->score <= range.min :
x->level.forward->score < range.min))
x = x->level.forward;
update = x;
}
/* Current node is the last with score < or <= min. */
x = x->level.forward;
/* Delete nodes while in range. */
while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
zskiplistNode *next = x->level.forward;
zslDeleteNode(zsl,x,update);//删除节点x
dictDelete(dict,x->obj);//在字典中也删除
zslFreeNode(x);
removed++;
x = next;
}
return removed;//删除的节点个数
}
4.11 zslDeleteRangeByRank
/* Delete all the elements with rank between start and end from the skiplist.
* Start and end are inclusive. Note that start and end need to be 1-based */
//删除给定排序范围内的所有节点
unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
zskiplistNode *update, *x;
unsigned long traversed = 0, removed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level.forward && (traversed + x->level.span) < start) {
traversed += x->level.span;//排名
x = x->level.forward;
}
update = x;
}
traversed++;
x = x->level.forward;
while (x && traversed <= end) {
zskiplistNode *next = x->level.forward;
zslDeleteNode(zsl,x,update);//删除节点x
dictDelete(dict,x->obj);
zslFreeNode(x);
removed++;
traversed++;
x = next;
}
return removed;
}
4.12 zslGetRank
//得到score在skiplist中的排名,如果元素不在skiplist中,返回0
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level.forward &&
(x->level.forward->score < score ||
(x->level.forward->score == score &&
compareStringObjects(x->level.forward->obj,o) <= 0))) {
rank += x->level.span;//排名,加上该层跨越的节点数目
x = x->level.forward;
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->obj && equalStringObjects(x->obj,o)) {// 找到目标元素
return rank;
}
}
return 0;
}
4.13 zslGetElementByRank
//根据给定的 rank 查找元素
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level.forward && (traversed + x->level.span) <= rank)
{
traversed += x->level.span;//排名
x = x->level.forward;
}
if (traversed == rank) {
return x;
}
}
return NULL;
}
上述13个API函数中可能内部调用的一些函数没有列出,相关代码请查看Redis 2.8.2源码。
五、小结
跳跃表(Skiplist)是一种随机化数据结构,它在查找、插入、删除等操作的期望时间复杂度都能达到对数级,并且编码相对简单许多,跳跃表目前是Redis中用于存储有序集合的底层数据结构,另外可以存储有序集的数据结构是字典,Redis中还有一种底层数据结构intset可以用来存储有序整数集。
Redis作者通过对原有的跳跃表进行修改,包括span的设计、score值可以重复、添加tail与backward指针等,从而实现了排序功能,从尾至头反向遍历的功能等。
最后感谢黄健宏(huangz1990)的Redis设计与实现及其他对Redis2.6源码的相关注释对我在研究Redis2.8源码方面的帮助。
留不住的过去、抓不住的未来 卍 究竟是拥有更难还是舍弃更让人痛心,经历过才知道 孤单是美好的爱情开始的原因,也是不幸爱情的结局 相当不错,感谢无私分享精神! 一个男人如果有太多的采集心里,那他就不配有美好的爱情 世界那么大,为什么还要执着于某一个人念念不忘呢
页:
[1]