youli3 发表于 2013-12-17 10:47:32

Redis内部数据结构详解之跳跃表(skiplist)

一、跳跃表简介
跳跃表是一种随机化数据结构,基于并联的链表,其效率可以比拟平衡二叉树,查找、删除、插入等操作都可以在对数期望时间内完成,对比平衡树,跳跃表的实现要简单直观很多。
以下是一个跳跃表的例图(来自维基百科):

从图中可以看出跳跃表主要有以下几个部分构成:
1、表头head:负责维护跳跃表的节点指针
2、节点node:实际保存元素值,每个节点有一层或多层
3、层level:保存着指向该层下一个节点的指针
4、表尾tail:全部由null组成
跳跃表的遍历总是从高层开始,然后随着元素值范围的缩小,慢慢降低到低层。

二、Redis中跳跃表的数据结构
         Redis作者为了适合自己功能的需要,对原来的跳跃表进行了一下修改:
1、允许重复的score值:多个不同的元素(member)的score值可以相同
2、进行元素对比的时候,不仅要检查score值,还需要检查member:当score值相等时,需要比较member域进行比较。
3、结构保存一个tail指针:跳跃表的表尾指针
4、每个节点都有一个高度为1层的前驱指针,用于从底层表尾向表头方向遍历
跳跃表数据结构如下(redis.h):
typedef struct zskiplistNode {
    robj *obj; //节点数据
    double score;
    struct zskiplistNode*backward; //前驱
    struct zskiplistLevel {
      struct zskiplistNode*forward;//后继
      unsigned int span;//该层跨越的节点数量
    } level[];
} zskiplistNode;

typedef struct zskiplist {
    struct zskiplistNode*header, *tail;
    unsigned long length;//节点的数目
    int level;//目前表的最大层数
} zskiplist;

         数据结构中可能span这个参数最不好理解了,下面简单解释一下:
         参照上面跳跃表的例图,head节点的span所有level的值都将是1;node 1在level 0 span值为1,因为跨越1个元素都将走到下一个节点2,在level 1 span值为2,因为需要跨越2个元素(node 2,node 3)才能到达下一个节点3。
         关于span的解释可以详看:
http://stackoverflow.com/questions/10458572/what-does-the-skiplistnode-variable-span-mean-in-redis-h
三、简单列出跳跃表的API,他们的作用以及算法复杂度:
约定O(N)表示对于元素个数的表达,O(L)表示对于跳表层数的表达
函数名作用复杂度
zslCreateNode新建并返回一个跳表节点O(1)
zslCreate新建并初始化一个跳跃表O(L)
zslFreeNode释放给定的节点O(1)
zslFree释放给定的跳跃表O(N)
zslRandomLevel得到新节点的层数(抛硬币法的改进)O(1)
zslInsert将给定的score与member新建节点并添加到跳表中O(logN)
zslDeleteNode删除给定的跳表节点O(L)
zslDelete删除给定的score与member在跳表中匹配的节点O(logN)
zslIsInRange检查跳表中的元素score值是否在给定的范围内O(1)
zslFirstInRange查找第一个符合给定范围的节点O(logN)
zslLastInRange查找最后一个符合给定范围的节点O(logN)
zslDeleteRangeByScore删除score值在给定范围内的节点O(logN)+O(M)
zslDeleteRangeByRank删除排名在给定范围内的节点O(logN)+O(M)
zslGetRank返回给定score与member在集合中的排名O(logN)
zslGetElementByRank根据给定的rank来查找元素O(logN)


四、上述API源码的简单解析
4.1 zslCreateNode
//建立一个skiplist节点,需要传入所在的level,score,以及保存的数值obj
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->obj = obj;
    return zn;
}
zmalloc是Redis在系统函数malloc上自己封装的函数,主要为了方便对内存使用情况的计算。

4.2zslCreate
//创建skiplist,header不存储任何数据
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);//<span style="font-family: Arial, Helvetica, sans-serif;">ZSKIPLIST_MAXLEVEL = </span>32
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
      zsl->header->level.forward = NULL;//后继
      zsl->header->level.span = 0;
    }
    zsl->header->backward = NULL;//前驱
    zsl->tail = NULL;//尾指针
    return zsl;
}

4.3 zslRandomLevel
int zslRandomLevel(void) {//为新的skiplist节点生成该节点level数目
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))//0.25
      level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

4.4 zslInsert
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update, *x;
    unsigned int rank;
    int i, level;

    redisAssert(!isnan(score));
    x = zsl->header;//header不存储数据
    //从高向低
    for (i = zsl->level-1; i >= 0; i--) {
      /* store rank that is crossed to reach the insert position */
      //rank用来记录第i层达到插入位置的所跨越的节点总数,也就是该层最接近(小于)给定score的排名
      //rank初始化为上一层所跨越的节点总数
      rank = i == (zsl->level-1) ? 0 : rank;
      //后继节点不为空,并且后继节点的score比给定的score小
      while (x->level.forward &&
            (x->level.forward->score < score ||
                //score相同,但节点的obj比给定的obj小
                (x->level.forward->score == score &&
                compareStringObjects(x->level.forward->obj,obj) < 0))) {
            rank += x->level.span;//记录跨越了多少个节点
            x = x->level.forward;//继续向右走
      }
      update = x;//保存访问的节点,并且将当前x移动到下一层
    }
    /* we assume the key is not already inside, since we allow duplicated
   * scores, and the re-insertion of score and redis object should never
   * happen since the caller of zslInsert() should test in the hash table
   * if the element is already inside or not. */
    level = zslRandomLevel();//计算新的level
    if (level > zsl->level) {//新的level > zsl->level,需要进行升级
      for (i = zsl->level; i < level; i++) {
            rank = 0;
            update = zsl->header;//需要更新的节点就是header
            update->level.span = zsl->length;
            //在未添加新节点之前,需要更新的节点跨越的节点数目自然就是zsl->length,
      }
      zsl->level = level;
    }
    x = zslCreateNode(level,score,obj);//建立新节点
    //开始插入节点
    for (i = 0; i < level; i++) {
      //新节点的后继就是插入位置节点的后继
      x->level.forward = update->level.forward;
      //插入位置节点的后继就是新节点
      update->level.forward = x;

      /* update span covered by update as x is inserted here */
      /**
            rank: 在第i层,update->score的排名
            rank - rank: update->score与update->score之间间隔了几个数,即span数目
            对于update->level.span值的更新由于在update与update->level->forward之间又添加了x,
            update->level.span = 从update到x的span数目,
            由于update后面肯定是新添加的x,所以自然新的update->level.span = (rank - rank) + 1;
            x->level.span = 从x到update->forword的span数目,
            原来的update->level.span = 从update到update->level->forward的span数目
            所以x->level.span = 原来的update->level.span - (rank - rank);

            另外需要注意当level > zsl->level时,update = zsl->header的span处理
      */
      x->level.span = update->level.span - (rank - rank);
      update->level.span = (rank - rank) + 1;
    }

    /* increment span for untouched levels */
    //如果新节点的level小于原来skiplist的level,那么在上层没有insert新节点的span需要加1
    for (i = level; i < zsl->level; i++) {
      update->level.span++;
    }

    x->backward = (update == zsl->header) ? NULL : update;//前驱指针
    if (x->level.forward)
      x->level.forward->backward = x;
    else
      zsl->tail = x;
    zsl->length++;
    return x;
}

4.5 zslDeleteNode
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
      if (update->level.forward == x) {//update->level的后继等于要删除节点x
            //原来的update->level.span == 1,所以新的span很好计算,就应该直接等于x->level.span
            update->level.span += x->level.span - 1;
            update->level.forward = x->level.forward;
      } else {
            update->level.span -= 1;
      }
    }
    if (x->level.forward) {//处理前驱节点
      x->level.forward->backward = x->backward;
    } else {//否则,更新tail
      zsl->tail = x->backward;
    }
    //收缩level
    while(zsl->level > 1 && zsl->header->level.forward == NULL)
      zsl->level--;
    zsl->length--;
}

4.6 zslDelete
/* Delete an element with matching score/object from the skiplist. */
//根据score, obj来删除节点
int zslDelete(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update, *x;
    int i;

    x = zsl->header;
    // 遍历所有层,记录删除节点后需要被修改的节点到 update 数组
    for (i = zsl->level-1; i >= 0; i--) {
      while (x->level.forward &&
            (x->level.forward->score < score ||
                (x->level.forward->score == score &&
                compareStringObjects(x->level.forward->obj,obj) < 0)))
            x = x->level.forward;
      update = x;
    }
    /* We may have multiple elements with the same score, what we need
   * is to find the element with both the right score and object. */
    x = x->level.forward;
    // 因为多个不同的 member 可能有相同的 score
    // 所以要确保 x 的 member 和 score 都匹配时,才进行删除
    if (x && score == x->score && equalStringObjects(x->obj,obj)) {
      zslDeleteNode(zsl, x, update);
      zslFreeNode(x);
      return 1;
    } else {
      return 0; /* not found */
    }
    return 0; /* not found */
}

4.8 zslFirstInRange
/* Find the first node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
//找到跳跃表中第一个符合给定范围的元素
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
    zskiplistNode *x;
    int i;

    /* If everything is out of range, return early. */
    if (!zslIsInRange(zsl,&range)) return NULL;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
      /* Go forward while *OUT* of range. */
      while (x->level.forward &&
            !zslValueGteMin(x->level.forward->score,&range))
                x = x->level.forward;
    }

    /* This is an inner range, so the next node cannot be NULL. */
    x = x->level.forward;
    redisAssert(x != NULL);

    /* Check if score <= max. */
    if (!zslValueLteMax(x->score,&range)) return NULL;
    return x;
}

4.9 zslLastInRange
/* Find the last node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
//找到跳跃表中最后一个符合给定范围的元素
zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
    zskiplistNode *x;
    int i;

    /* If everything is out of range, return early. */
    if (!zslIsInRange(zsl,&range)) return NULL;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
      /* Go forward while *IN* range. */
      while (x->level.forward &&
            zslValueLteMax(x->level.forward->score,&range))
                x = x->level.forward;
    }

    /* This is an inner range, so this node cannot be NULL. */
    redisAssert(x != NULL);

    /* Check if score >= min. */
    if (!zslValueGteMin(x->score,&range)) return NULL;
    return x;
}

4.10 zslDeleteRangeByScore
/* Delete all the elements with score between min and max from the skiplist.
* Min and max are inclusive, so a score >= min || score <= max is deleted.
* Note that this function takes the reference to the hash table view of the
* sorted set, in order to remove the elements from the hash table too. */
//删除给定范围内的 score 的元素。
unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
    zskiplistNode *update, *x;
    unsigned long removed = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
      while (x->level.forward && (range.minex ?//是否是闭区间
            x->level.forward->score <= range.min :
            x->level.forward->score < range.min))
                x = x->level.forward;
      update = x;
    }

    /* Current node is the last with score < or <= min. */
    x = x->level.forward;

    /* Delete nodes while in range. */
    while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
      zskiplistNode *next = x->level.forward;
      zslDeleteNode(zsl,x,update);//删除节点x
      dictDelete(dict,x->obj);//在字典中也删除
      zslFreeNode(x);
      removed++;
      x = next;
    }
    return removed;//删除的节点个数
}

4.11 zslDeleteRangeByRank
/* Delete all the elements with rank between start and end from the skiplist.
* Start and end are inclusive. Note that start and end need to be 1-based */
//删除给定排序范围内的所有节点
unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
    zskiplistNode *update, *x;
    unsigned long traversed = 0, removed = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
      while (x->level.forward && (traversed + x->level.span) < start) {
            traversed += x->level.span;//排名
            x = x->level.forward;
      }
      update = x;
    }

    traversed++;
    x = x->level.forward;
    while (x && traversed <= end) {
      zskiplistNode *next = x->level.forward;
      zslDeleteNode(zsl,x,update);//删除节点x
      dictDelete(dict,x->obj);
      zslFreeNode(x);
      removed++;
      traversed++;
      x = next;
    }
    return removed;
}

4.12 zslGetRank
//得到score在skiplist中的排名,如果元素不在skiplist中,返回0
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
      while (x->level.forward &&
            (x->level.forward->score < score ||
                (x->level.forward->score == score &&
                compareStringObjects(x->level.forward->obj,o) <= 0))) {
            rank += x->level.span;//排名,加上该层跨越的节点数目
            x = x->level.forward;
      }

      /* x might be equal to zsl->header, so test if obj is non-NULL */
      if (x->obj && equalStringObjects(x->obj,o)) {// 找到目标元素
            return rank;
      }
    }
    return 0;
}

4.13 zslGetElementByRank
//根据给定的 rank 查找元素
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
    zskiplistNode *x;
    unsigned long traversed = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
      while (x->level.forward && (traversed + x->level.span) <= rank)
      {
            traversed += x->level.span;//排名
            x = x->level.forward;
      }
      if (traversed == rank) {
            return x;
      }
    }
    return NULL;
}

上述13个API函数中可能内部调用的一些函数没有列出,相关代码请查看Redis 2.8.2源码。

五、小结
跳跃表(Skiplist)是一种随机化数据结构,它在查找、插入、删除等操作的期望时间复杂度都能达到对数级,并且编码相对简单许多,跳跃表目前是Redis中用于存储有序集合的底层数据结构,另外可以存储有序集的数据结构是字典,Redis中还有一种底层数据结构intset可以用来存储有序整数集。
Redis作者通过对原有的跳跃表进行修改,包括span的设计、score值可以重复、添加tail与backward指针等,从而实现了排序功能,从尾至头反向遍历的功能等。
最后感谢黄健宏(huangz1990)的Redis设计与实现及其他对Redis2.6源码的相关注释对我在研究Redis2.8源码方面的帮助。


devil20 发表于 2013-12-20 02:26:50

留不住的过去、抓不住的未来 卍

jmton 发表于 2013-12-20 22:47:59

究竟是拥有更难还是舍弃更让人痛心,经历过才知道

zz775520666 发表于 2013-12-20 23:28:38

孤单是美好的爱情开始的原因,也是不幸爱情的结局

死siua11 发表于 2013-12-21 03:27:02

相当不错,感谢无私分享精神!

爱在莫斯科 发表于 2013-12-21 22:42:01

一个男人如果有太多的采集心里,那他就不配有美好的爱情

lyl801013 发表于 2013-12-22 05:49:29

世界那么大,为什么还要执着于某一个人念念不忘呢
页: [1]
查看完整版本: Redis内部数据结构详解之跳跃表(skiplist)