设为首页 收藏本站
查看: 911|回复: 0

[经验分享] Redis源码笔记四: skiplist

[复制链接]

尚未签到

发表于 2015-7-23 10:14:46 | 显示全部楼层 |阅读模式
DSC0000.gif DSC0001.gif


1 /* ZSETs use a specialized version of Skiplists */
2 typedef struct zskiplistNode {
3
4     /* member对象 */
5     robj *obj;   
6
7     /* 分值 */
8     double score;
9
10     /* 后退指针 */
11     struct zskiplistNode *backward;
12
13     /* 层 */
14     struct zskiplistLevel {
15         /* 前进指针 */
16         struct zskiplistNode *forward;
17
18         /* 这个层跨越的节点数量 */
19         unsigned int span;
20     } level[];
21 } zskiplistNode; /* 跳跃表节点  */
22
23 typedef struct zskiplist {
24     /* 头节点,尾节点 */
25     struct zskiplistNode *header, *tail;
26
27     /* 节点数量 */
28     unsigned long length;
29
30     /* 目前表内节点的最大层数 */
31     int level;
32 } zskiplist; /* 跳跃表 */
redis.h




   1 /*
   2  * Copyright (c) 2009-2012, Salvatore Sanfilippo
   3  * Copyright (c) 2009-2012, Pieter Noordhuis
   4  * All rights reserved.
   5  *
   6  * Redistribution and use in source and binary forms, with or without
   7  * modification, are permitted provided that the following conditions are met:
   8  *
   9  *   * Redistributions of source code must retain the above copyright notice,
  10  *     this list of conditions and the following disclaimer.
  11  *   * Redistributions in binary form must reproduce the above copyright
  12  *     notice, this list of conditions and the following disclaimer in the
  13  *     documentation and/or other materials provided with the distribution.
  14  *   * Neither the name of Redis nor the names of its contributors may be used
  15  *     to endorse or promote products derived from this software without
  16  *     specific prior written permission.
  17  *
  18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  28  * POSSIBILITY OF SUCH DAMAGE.
  29  */
  30
  31 /*-----------------------------------------------------------------------------
  32  * Sorted set API
  33  *----------------------------------------------------------------------------*/
  34
  35 /* ZSETs are ordered sets using two data structures to hold the same elements
  36  * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
  37  * data structure.
  38  *
  39  * The elements are added to an hash table mapping Redis objects to scores.
  40  * At the same time the elements are added to a skip list mapping scores
  41  * to Redis objects (so objects are sorted by scores in this "view"). */
  42
  43 /* This skiplist implementation is almost a C translation of the original
  44  * algorithm described by William Pugh in "Skip Lists: A Probabilistic
  45  * Alternative to Balanced Trees", modified in three ways:
  46  * a) this implementation allows for repeated scores.
  47  * b) the comparison is not just by key (our 'score') but by satellite data.
  48  * c) there is a back pointer, so it's a doubly linked list with the back
  49  * pointers being only at "level 1". This allows to traverse the list
  50  * from tail to head, useful for ZREVRANGE. */
  51
  52 #include "redis.h"
  53 #include
  54
  55 /* 创建并返回一个新的跳跃表节点 */
  56 zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
  57     /* 分配层 */
  58     zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
  59
  60     /* 更新属性 */
  61     zn->score = score;
  62     zn->obj = obj;
  63
  64     return zn;
  65 }
  66
  67 /* 创建并初始化一个新的跳跃表 */
  68 zskiplist *zslCreate(void) {
  69     int j;
  70     zskiplist *zsl;
  71
  72     /* 分配空间 */
  73     zsl = zmalloc(sizeof(*zsl));
  74     
  75     /* 更新属性 */
  76     zsl->level = 1;
  77     zsl->length = 0;
  78
  79     /* 初始化头节点 */
  80     zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
  81     
  82     /* 初始化层指针 */
  83     for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
  84         zsl->header->level[j].forward = NULL;    /* 前进指针 */
  85         zsl->header->level[j].span = 0;
  86     }
  87     zsl->header->backward = NULL;    /* 后退指针 */
  88     zsl->tail = NULL;
  89
  90     return zsl;
  91 }
  92
  93 /* 释放给定的跳跃表节点 */
  94 void zslFreeNode(zskiplistNode *node) {
  95     decrRefCount(node->obj);
  96     zfree(node);
  97 }
  98
  99 /* 释放整个跳跃表 */
100 void zslFree(zskiplist *zsl) {
101
102     zskiplistNode *node = zsl->header->level[0].forward, *next;
103
104     zfree(zsl->header);
105
106     /* 遍历删除 */
107     while(node) {
108         next = node->level[0].forward;
109         zslFreeNode(node);
110         node = next;
111     }
112
113     zfree(zsl);
114 }
115
116 /* Returns a random level for the new skiplist node we are going to create.
117  * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
118  * (both inclusive), with a powerlaw-alike distribution where higher
119  * levels are less likely to be returned. */
120 /* 返回一个介于1和ASKIPLIST_MAXLEVEL之间的随机值,作为节点的层数。*/
121 /* 根据幂次定律(power law),数值越大,函数生成它的几率就越小 */
122 int zslRandomLevel(void) {
123     int level = 1;
124     while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
125         level += 1;
126     return (levelheader;
142
143     /* 记录沿途访问的节点,并计数span等属性 */
144     for (i = zsl->level-1; i >= 0; i--) {
145         /* store rank that is crossed to reach the insert position */
146         rank = i == (zsl->level-1) ? 0 : rank[i+1];
147
148         /* 右节点不为空 */
149         while (x->level.forward &&
150             /* 右节点的score比给定score小 */
151             (x->level.forward->score < score ||
152                 /* 右节点的score相同,但节点的member比输入的member要小 */
153                 (x->level.forward->score == score &&
154                 compareStringObjects(x->level.forward->obj,obj) < 0))) {
155         
156             /* 记录跨越多少元素 */
157             rank += x->level.span;
158     
159             /* 继续向右前进 */
160             x = x->level.forward;
161         }
162         /* 保存访问节点 */
163         update = x;
164     }
165     /* we assume the key is not already inside, since we allow duplicated
166      * scores, and the re-insertion of score and redis object should never
167      * happen since the caller of zslInsert() should test in the hash table
168      * if the element is already inside or not. */
169     /* 因为整个函数不可能处理两个元素的member和score都相同的情况 */
170     /* 所以直接创建新节点,不用检查存在性 */
171
172     /* 计算新的随机层数 */
173     level = zslRandomLevel();
174
175     /*
176      * 如果level比当前跳跃表的最大层还要大
177      * 那么更新zsl->level参数
178      * 并且初始化update和rank参数在相应层的数据
179     */
180     if (level > zsl->level) {
181         for (i = zsl->level; i < level; i++) {
182             rank = 0;
183             update = zsl->header;
184             update->level.span = zsl->length;
185         }
186         zsl->level = level;
187     }
188
189     /* 创建新节点 */
190     x = zslCreateNode(level,score,obj);
191
192     /* 根据update和rank两个数组的资料,初始化新节点并设置相应的指针 */
193     for (i = 0; i < level; i++) {
194
195         /* 设置指针 */
196         x->level.forward = update->level.forward;
197         update->level.forward = x;
198
199         /* update span covered by update as x is inserted here */
200         /* 设置span */
201         x->level.span = update->level.span - (rank[0] - rank);
202         update->level.span = (rank[0] - rank) + 1;
203     }
204
205     /* increment span for untouched levels */
206     /* 更新给定沿途访问节点的span */
207     for (i = level; i < zsl->level; i++) {
208         update->level.span++;
209     }
210     
211     /* 设置后退指针 */
212     x->backward = (update[0] == zsl->header) ? NULL : update[0];
213
214     /* 设置x的前进指针 */
215     if (x->level[0].forward)
216         x->level[0].forward->backward = x;
217     else
218     /* 这个时新的表尾节点 */
219         zsl->tail = x;
220
221     /* 更新跳跃表节点数量 */
222     zsl->length++;
223
224     return x;
225 }
226
227 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
228 /* 删除给定的跳跃表节点 */
229 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
230     int i;
231
232     /* 修改相应的指针和span */
233     for (i = 0; i < zsl->level; i++) {
234         if (update->level.forward == x) {
235             update->level.span += x->level.span - 1;
236             update->level.forward = x->level.forward;
237         } else {
238             update->level.span -= 1;
239         }
240     }
241
242     /* 处理表头和表尾节点 */
243     if (x->level[0].forward) {
244         x->level[0].forward->backward = x->backward;
245     } else {
246         zsl->tail = x->backward;
247     }
248
249     /* 收缩level的值 */
250     while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
251         zsl->level--;
252
253     zsl->length--;
254 }
255
256 /* Delete an element with matching score/object from the skiplist. */
257 /* 从跳跃表中删除和给定obj以及给定score匹配的元素 */
258 int zslDelete(zskiplist *zsl, double score, robj *obj) {
259     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
260     int i;
261
262     x = zsl->header;
263     /* 遍历所有层,记录删除节点后需要被修改的节点到update数组 */
264     for (i = zsl->level-1; i >= 0; i--) {
265         while (x->level.forward &&
266             (x->level.forward->score < score ||
267                 (x->level.forward->score == score &&
268                 compareStringObjects(x->level.forward->obj,obj) < 0)))
269             x = x->level.forward;
270         update = x;
271     }
272     /* We may have multiple elements with the same score, what we need
273      * is to find the element with both the right score and object. */
274     /* 因为多个不同的member可能由相同的score */
275     /* 所以要确保x的member和score都匹配时,才进行删除 */
276     x = x->level[0].forward;
277     if (x && score == x->score && equalStringObjects(x->obj,obj)) {
278         zslDeleteNode(zsl, x, update);
279         zslFreeNode(x);
280         return 1;
281     } else {
282         return 0; /* not found */
283     }
284     return 0; /* not found */
285 }
286
287 /* 检查value是否属于spec指定的范围内 */
288 static int zslValueGteMin(double value, zrangespec *spec) {
289     return spec->minex ? (value > spec->min) : (value >= spec->min);
290 }
291
292 /* 检查value是否属于spec指定的范围内 */
293 static int zslValueLteMax(double value, zrangespec *spec) {
294     return spec->maxex ? (value < spec->max) : (value max);
295 }
296
297 /* Returns if there is a part of the zset is in range. */
298 /* 检查zset中的元素是否在给定范围之内 */
299 int zslIsInRange(zskiplist *zsl, zrangespec *range) {
300     zskiplistNode *x;
301
302     /* Test for ranges that will always be empty. */
303     if (range->min > range->max ||
304             (range->min == range->max && (range->minex || range->maxex)))
305         return 0;
306
307     /* 如果zset的最大节点的score比范围的最小值要小 */
308     /* 那么zset不在范围之内 */
309     x = zsl->tail;
310     if (x == NULL || !zslValueGteMin(x->score,range))
311         return 0;
312
313     /* 如果zset的最小节点的score比范围的最大值要大 */
314     /* 那么zset不在范围之内 */
315     x = zsl->header->level[0].forward;
316     if (x == NULL || !zslValueLteMax(x->score,range))
317         return 0;
318
319     /* 在范围内 */
320     return 1;
321 }
322
323 /* Find the first node that is contained in the specified range.
324  * Returns NULL when no element is contained in the range. */
325 /* 找到跳跃表中的第一个符合给定范围的元素 */
326 zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
327     zskiplistNode *x;
328     int i;
329
330     /* If everything is out of range, return early. */
331     if (!zslIsInRange(zsl,&range)) return NULL;
332
333     /* 找到第一个score值大于给定范围最小值的节点 */
334     x = zsl->header;
335     for (i = zsl->level-1; i >= 0; i--) {
336         /* Go forward while *OUT* of range. */
337         while (x->level.forward &&
338             !zslValueGteMin(x->level.forward->score,&range))
339                 x = x->level.forward;
340     }
341
342     /* This is an inner range, so the next node cannot be NULL. */
343     x = x->level[0].forward;
344     redisAssert(x != NULL);
345
346     /* Check if score score,&range)) return NULL;
348     return x;
349 }
350
351 /* Find the last node that is contained in the specified range.
352  * Returns NULL when no element is contained in the range. */
353 /* 找到跳跃表中最后一个符合给定范围的元素 */
354 zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
355     zskiplistNode *x;
356     int i;
357
358     /* If everything is out of range, return early. */
359     if (!zslIsInRange(zsl,&range)) return NULL;
360
361     x = zsl->header;
362     for (i = zsl->level-1; i >= 0; i--) {
363         /* Go forward while *IN* range. */
364         while (x->level.forward &&
365             zslValueLteMax(x->level.forward->score,&range))
366                 x = x->level.forward;
367     }
368
369     /* This is an inner range, so this node cannot be NULL. */
370     redisAssert(x != NULL);
371
372     /* Check if score >= min. */
373     if (!zslValueGteMin(x->score,&range)) return NULL;
374     return x;
375 }
376
377 /* Delete all the elements with score between min and max from the skiplist.
378  * Min and max are inclusive, so a score >= min || score header;
389     for (i = zsl->level-1; i >= 0; i--) {
390         while (x->level.forward && (range.minex ?
391             x->level.forward->score level.forward->score < range.min))
393                 x = x->level.forward;
394         update = x;
395     }
396
397     /* Current node is the last with score < or level[0].forward;
399
400     /* Delete nodes while in range. */
401     /* 一直向右删除,直到到达range的底为止 */
402     while (x && (range.maxex ? x->score < range.max : x->score level[0].forward;
405
406         /* 在跳跃表中删除 */
407         zslDeleteNode(zsl,x,update);
408
409         /* 在字典中删除 */
410         dictDelete(dict,x->obj);
411
412         /* 释放 */
413         zslFreeNode(x);
414
415         removed++;
416
417         x = next;
418     }
419
420     return removed;
421 }
422
423 /* Delete all the elements with rank between start and end from the skiplist.
424  * Start and end are inclusive. Note that start and end need to be 1-based */
425 /* 删除给定排序范围内的所有节点 */
426 unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
427     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
428     unsigned long traversed = 0, removed = 0;
429     int i;
430
431     /* 通过计算rank,移动到删除开始的地方 */
432     x = zsl->header;
433     for (i = zsl->level-1; i >= 0; i--) {
434         while (x->level.forward && (traversed + x->level.span) < start) {
435             traversed += x->level.span;
436             x = x->level.forward;
437         }
438         update = x;
439     }
440     
441     /* 算上start节点 */
442     traversed++;
443
444     /* 从start开始,删除直到到达索引end,或者末尾 */
445     x = x->level[0].forward;
446
447     while (x && traversed level[0].forward;
450
451         /* 删除skiplist节点 */
452         zslDeleteNode(zsl,x,update);
453
454         /* 删除dict节点 */
455         dictDelete(dict,x->obj);
456
457         /* 删除节点 */
458         zslFreeNode(x);
459
460         /* 删除计数 */
461         removed++;
462         traversed++;
463         x = next;
464     }
465     return removed;
466 }
467
468 /* Find the rank for an element by both score and key.
469  * Returns 0 when the element cannot be found, rank otherwise.
470  * Note that the rank is 1-based due to the span of zsl->header to the
471  * first element. */
472 unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
473     zskiplistNode *x;
474     unsigned long rank = 0;
475     int i;
476
477     x = zsl->header;
478     for (i = zsl->level-1; i >= 0; i--) {
479         while (x->level.forward &&
480             (x->level.forward->score < score ||
481                 (x->level.forward->score == score &&
482                 compareStringObjects(x->level.forward->obj,o) level.span;
484             x = x->level.forward;
485         }
486
487         /* x might be equal to zsl->header, so test if obj is non-NULL */
488         if (x->obj && equalStringObjects(x->obj,o)) {
489             return rank;
490         }
491     }
492     return 0;
493 }
494
495 /* Finds an element by its rank. The rank argument needs to be 1-based. */
496 zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
497     zskiplistNode *x;
498     unsigned long traversed = 0;
499     int i;
500
501     x = zsl->header;
502     for (i = zsl->level-1; i >= 0; i--) {
503         while (x->level.forward && (traversed + x->level.span) level.span;
506             x = x->level.forward;
507         }
508         if (traversed == rank) {
509             return x;
510         }
511     }
512     return NULL;
513 }
514
515 /* Populate the rangespec according to the objects min and max. */
516 static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
517     char *eptr;
518     spec->minex = spec->maxex = 0;
519
520     /* Parse the min-max interval. If one of the values is prefixed
521      * by the "(" character, it's considered "open". For instance
522      * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
523      * ZRANGEBYSCORE zset 1.5 2.5 will instead match min min = (long)min->ptr;
526     } else {
527         if (((char*)min->ptr)[0] == '(') {
528             spec->min = strtod((char*)min->ptr+1,&eptr);
529             if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
530             spec->minex = 1;
531         } else {
532             spec->min = strtod((char*)min->ptr,&eptr);
533             if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
534         }
535     }
536     if (max->encoding == REDIS_ENCODING_INT) {
537         spec->max = (long)max->ptr;
538     } else {
539         if (((char*)max->ptr)[0] == '(') {
540             spec->max = strtod((char*)max->ptr+1,&eptr);
541             if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
542             spec->maxex = 1;
543         } else {
544             spec->max = strtod((char*)max->ptr,&eptr);
545             if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
546         }
547     }
548
549     return REDIS_OK;
550 }
551
552 /*-----------------------------------------------------------------------------
553  * Ziplist-backed sorted set API
554  *----------------------------------------------------------------------------*/
555
556 double zzlGetScore(unsigned char *sptr) {
557     unsigned char *vstr;
558     unsigned int vlen;
559     long long vlong;
560     char buf[128];
561     double score;
562
563     redisAssert(sptr != NULL);
564     redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
565
566     if (vstr) {
567         memcpy(buf,vstr,vlen);
568         buf[vlen] = '\0';
569         score = strtod(buf,NULL);
570     } else {
571         score = vlong;
572     }
573
574     return score;
575 }
576
577 /* Compare element in sorted set with given element. */
578 int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
579     unsigned char *vstr;
580     unsigned int vlen;
581     long long vlong;
582     unsigned char vbuf[32];
583     int minlen, cmp;
584
585     redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
586     if (vstr == NULL) {
587         /* Store string representation of long long in buf. */
588         vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
589         vstr = vbuf;
590     }
591
592     minlen = (vlen < clen) ? vlen : clen;
593     cmp = memcmp(vstr,cstr,minlen);
594     if (cmp == 0) return vlen-clen;
595     return cmp;
596 }
597
598 unsigned int zzlLength(unsigned char *zl) {
599     return ziplistLen(zl)/2;
600 }
601
602 /* Move to next entry based on the values in eptr and sptr. Both are set to
603  * NULL when there is no next entry. */
604 void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
605     unsigned char *_eptr, *_sptr;
606     redisAssert(*eptr != NULL && *sptr != NULL);
607
608     _eptr = ziplistNext(zl,*sptr);
609     if (_eptr != NULL) {
610         _sptr = ziplistNext(zl,_eptr);
611         redisAssert(_sptr != NULL);
612     } else {
613         /* No next entry. */
614         _sptr = NULL;
615     }
616
617     *eptr = _eptr;
618     *sptr = _sptr;
619 }
620
621 /* Move to the previous entry based on the values in eptr and sptr. Both are
622  * set to NULL when there is no next entry. */
623 void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
624     unsigned char *_eptr, *_sptr;
625     redisAssert(*eptr != NULL && *sptr != NULL);
626
627     _sptr = ziplistPrev(zl,*eptr);
628     if (_sptr != NULL) {
629         _eptr = ziplistPrev(zl,_sptr);
630         redisAssert(_eptr != NULL);
631     } else {
632         /* No previous entry. */
633         _eptr = NULL;
634     }
635
636     *eptr = _eptr;
637     *sptr = _sptr;
638 }
639
640 /* Returns if there is a part of the zset is in range. Should only be used
641  * internally by zzlFirstInRange and zzlLastInRange. */
642 int zzlIsInRange(unsigned char *zl, zrangespec *range) {
643     unsigned char *p;
644     double score;
645
646     /* Test for ranges that will always be empty. */
647     if (range->min > range->max ||
648             (range->min == range->max && (range->minex || range->maxex)))
649         return 0;
650
651     p = ziplistIndex(zl,-1); /* Last score. */
652     if (p == NULL) return 0; /* Empty sorted set */
653     score = zzlGetScore(p);
654     if (!zslValueGteMin(score,range))
655         return 0;
656
657     p = ziplistIndex(zl,1); /* First score. */
658     redisAssert(p != NULL);
659     score = zzlGetScore(p);
660     if (!zslValueLteMax(score,range))
661         return 0;
662
663     return 1;
664 }
665
666 /* Find pointer to the first element contained in the specified range.
667  * Returns NULL when no element is contained in the range. */
668 unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec range) {
669     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
670     double score;
671
672     /* If everything is out of range, return early. */
673     if (!zzlIsInRange(zl,&range)) return NULL;
674
675     while (eptr != NULL) {
676         sptr = ziplistNext(zl,eptr);
677         redisAssert(sptr != NULL);
678
679         score = zzlGetScore(sptr);
680         if (zslValueGteMin(score,&range)) {
681             /* Check if score = min. */
710             if (zslValueGteMin(score,&range))
711                 return eptr;
712             return NULL;
713         }
714
715         /* Move to previous element by moving to the score of previous element.
716          * When this returns NULL, we know there also is no element. */
717         sptr = ziplistPrev(zl,eptr);
718         if (sptr != NULL)
719             redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
720         else
721             eptr = NULL;
722     }
723
724     return NULL;
725 }
726
727 unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
728     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
729
730     ele = getDecodedObject(ele);
731     while (eptr != NULL) {
732         sptr = ziplistNext(zl,eptr);
733         redisAssertWithInfo(NULL,ele,sptr != NULL);
734
735         if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
736             /* Matching element, pull out score. */
737             if (score != NULL) *score = zzlGetScore(sptr);
738             decrRefCount(ele);
739             return eptr;
740         }
741
742         /* Move to next element. */
743         eptr = ziplistNext(zl,sptr);
744     }
745
746     decrRefCount(ele);
747     return NULL;
748 }
749
750 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
751  * don't want to modify the one given as argument. */
752 unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
753     unsigned char *p = eptr;
754
755     /* TODO: add function to ziplist API to delete N elements from offset. */
756     zl = ziplistDelete(zl,&p);
757     zl = ziplistDelete(zl,&p);
758     return zl;
759 }
760
761 unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
762     unsigned char *sptr;
763     char scorebuf[128];
764     int scorelen;
765     size_t offset;
766
767     redisAssertWithInfo(NULL,ele,ele->encoding == REDIS_ENCODING_RAW);
768     scorelen = d2string(scorebuf,sizeof(scorebuf),score);
769     if (eptr == NULL) {
770         zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
771         zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
772     } else {
773         /* Keep offset relative to zl, as it might be re-allocated. */
774         offset = eptr-zl;
775         zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
776         eptr = zl+offset;
777
778         /* Insert score after the element. */
779         redisAssertWithInfo(NULL,ele,(sptr = ziplistNext(zl,eptr)) != NULL);
780         zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
781     }
782
783     return zl;
784 }
785
786 /* Insert (element,score) pair in ziplist. This function assumes the element is
787  * not yet present in the list. */
788 unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
789     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
790     double s;
791
792     ele = getDecodedObject(ele);
793     while (eptr != NULL) {
794         sptr = ziplistNext(zl,eptr);
795         redisAssertWithInfo(NULL,ele,sptr != NULL);
796         s = zzlGetScore(sptr);
797
798         if (s > score) {
799             /* First element with score larger than score for element to be
800              * inserted. This means we should take its spot in the list to
801              * maintain ordering. */
802             zl = zzlInsertAt(zl,eptr,ele,score);
803             break;
804         } else if (s == score) {
805             /* Ensure lexicographical ordering for elements. */
806             if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
807                 zl = zzlInsertAt(zl,eptr,ele,score);
808                 break;
809             }
810         }
811
812         /* Move to next element. */
813         eptr = ziplistNext(zl,sptr);
814     }
815
816     /* Push on tail of list when it was not yet inserted. */
817     if (eptr == NULL)
818         zl = zzlInsertAt(zl,NULL,ele,score);
819
820     decrRefCount(ele);
821     return zl;
822 }
823
824 unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec range, unsigned long *deleted) {
825     unsigned char *eptr, *sptr;
826     double score;
827     unsigned long num = 0;
828
829     if (deleted != NULL) *deleted = 0;
830
831     eptr = zzlFirstInRange(zl,range);
832     if (eptr == NULL) return zl;
833
834     /* When the tail of the ziplist is deleted, eptr will point to the sentinel
835      * byte and ziplistNext will return NULL. */
836     while ((sptr = ziplistNext(zl,eptr)) != NULL) {
837         score = zzlGetScore(sptr);
838         if (zslValueLteMax(score,&range)) {
839             /* Delete both the element and the score. */
840             zl = ziplistDelete(zl,&eptr);
841             zl = ziplistDelete(zl,&eptr);
842             num++;
843         } else {
844             /* No longer in range. */
845             break;
846         }
847     }
848
849     if (deleted != NULL) *deleted = num;
850     return zl;
851 }
852
853 /* Delete all the elements with rank between start and end from the skiplist.
854  * Start and end are inclusive. Note that start and end need to be 1-based */
855 unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
856     unsigned int num = (end-start)+1;
857     if (deleted) *deleted = num;
858     zl = ziplistDeleteRange(zl,2*(start-1),2*num);
859     return zl;
860 }
861
862 /*-----------------------------------------------------------------------------
863  * Common sorted set API
864  *----------------------------------------------------------------------------*/
865
866 unsigned int zsetLength(robj *zobj) {
867     int length = -1;
868     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
869         length = zzlLength(zobj->ptr);
870     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
871         length = ((zset*)zobj->ptr)->zsl->length;
872     } else {
873         redisPanic("Unknown sorted set encoding");
874     }
875     return length;
876 }
877
878 void zsetConvert(robj *zobj, int encoding) {
879     zset *zs;
880     zskiplistNode *node, *next;
881     robj *ele;
882     double score;
883
884     if (zobj->encoding == encoding) return;
885     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
886         unsigned char *zl = zobj->ptr;
887         unsigned char *eptr, *sptr;
888         unsigned char *vstr;
889         unsigned int vlen;
890         long long vlong;
891
892         if (encoding != REDIS_ENCODING_SKIPLIST)
893             redisPanic("Unknown target encoding");
894
895         zs = zmalloc(sizeof(*zs));
896         zs->dict = dictCreate(&zsetDictType,NULL);
897         zs->zsl = zslCreate();
898
899         eptr = ziplistIndex(zl,0);
900         redisAssertWithInfo(NULL,zobj,eptr != NULL);
901         sptr = ziplistNext(zl,eptr);
902         redisAssertWithInfo(NULL,zobj,sptr != NULL);
903
904         while (eptr != NULL) {
905             score = zzlGetScore(sptr);
906             redisAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
907             if (vstr == NULL)
908                 ele = createStringObjectFromLongLong(vlong);
909             else
910                 ele = createStringObject((char*)vstr,vlen);
911
912             /* Has incremented refcount since it was just created. */
913             node = zslInsert(zs->zsl,score,ele);
914             redisAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
915             incrRefCount(ele); /* Added to dictionary. */
916             zzlNext(zl,&eptr,&sptr);
917         }
918
919         zfree(zobj->ptr);
920         zobj->ptr = zs;
921         zobj->encoding = REDIS_ENCODING_SKIPLIST;
922     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
923         unsigned char *zl = ziplistNew();
924
925         if (encoding != REDIS_ENCODING_ZIPLIST)
926             redisPanic("Unknown target encoding");
927
928         /* Approach similar to zslFree(), since we want to free the skiplist at
929          * the same time as creating the ziplist. */
930         zs = zobj->ptr;
931         dictRelease(zs->dict);
932         node = zs->zsl->header->level[0].forward;
933         zfree(zs->zsl->header);
934         zfree(zs->zsl);
935
936         while (node) {
937             ele = getDecodedObject(node->obj);
938             zl = zzlInsertAt(zl,NULL,ele,node->score);
939             decrRefCount(ele);
940
941             next = node->level[0].forward;
942             zslFreeNode(node);
943             node = next;
944         }
945
946         zfree(zs);
947         zobj->ptr = zl;
948         zobj->encoding = REDIS_ENCODING_ZIPLIST;
949     } else {
950         redisPanic("Unknown sorted set encoding");
951     }
952 }
953
954 /*-----------------------------------------------------------------------------
955  * Sorted set commands
956  *----------------------------------------------------------------------------*/
957
958 /* This generic command implements both ZADD and ZINCRBY. */
959 void zaddGenericCommand(redisClient *c, int incr) {
960     static char *nanerr = "resulting score is not a number (NaN)";
961     robj *key = c->argv[1];
962     robj *ele;
963     robj *zobj;
964     robj *curobj;
965     double score = 0, *scores, curscore = 0.0;
966     int j, elements = (c->argc-2)/2;
967     int added = 0;
968
969     if (c->argc % 2) {
970         addReply(c,shared.syntaxerr);
971         return;
972     }
973
974     /* Start parsing all the scores, we need to emit any syntax error
975      * before executing additions to the sorted set, as the command should
976      * either execute fully or nothing at all. */
977     scores = zmalloc(sizeof(double)*elements);
978     for (j = 0; j < elements; j++) {
979         if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL)
980             != REDIS_OK)
981         {
982             zfree(scores);
983             return;
984         }
985     }
986
987     /* Lookup the key and create the sorted set if does not exist. */
988     zobj = lookupKeyWrite(c->db,key);
989     if (zobj == NULL) {
990         if (server.zset_max_ziplist_entries == 0 ||
991             server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
992         {
993             zobj = createZsetObject();
994         } else {
995             zobj = createZsetZiplistObject();
996         }
997         dbAdd(c->db,key,zobj);
998     } else {
999         if (zobj->type != REDIS_ZSET) {
1000             addReply(c,shared.wrongtypeerr);
1001             zfree(scores);
1002             return;
1003         }
1004     }
1005
1006     for (j = 0; j < elements; j++) {
1007         score = scores[j];
1008
1009         if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1010             unsigned char *eptr;
1011
1012             /* Prefer non-encoded element when dealing with ziplists. */
1013             ele = c->argv[3+j*2];
1014             if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
1015                 if (incr) {
1016                     score += curscore;
1017                     if (isnan(score)) {
1018                         addReplyError(c,nanerr);
1019                         /* Don't need to check if the sorted set is empty
1020                          * because we know it has at least one element. */
1021                         zfree(scores);
1022                         return;
1023                     }
1024                 }
1025
1026                 /* Remove and re-insert when score changed. */
1027                 if (score != curscore) {
1028                     zobj->ptr = zzlDelete(zobj->ptr,eptr);
1029                     zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1030
1031                     signalModifiedKey(c->db,key);
1032                     server.dirty++;
1033                 }
1034             } else {
1035                 /* Optimize: check if the element is too large or the list
1036                  * becomes too long *before* executing zzlInsert. */
1037                 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1038                 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
1039                     zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
1040                 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
1041                     zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
1042
1043                 signalModifiedKey(c->db,key);
1044                 server.dirty++;
1045                 if (!incr) added++;
1046             }
1047         } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1048             zset *zs = zobj->ptr;
1049             zskiplistNode *znode;
1050             dictEntry *de;
1051
1052             ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]);
1053             de = dictFind(zs->dict,ele);
1054             if (de != NULL) {
1055                 curobj = dictGetKey(de);
1056                 curscore = *(double*)dictGetVal(de);
1057
1058                 if (incr) {
1059                     score += curscore;
1060                     if (isnan(score)) {
1061                         addReplyError(c,nanerr);
1062                         /* Don't need to check if the sorted set is empty
1063                          * because we know it has at least one element. */
1064                         zfree(scores);
1065                         return;
1066                     }
1067                 }
1068
1069                 /* Remove and re-insert when score changed. We can safely
1070                  * delete the key object from the skiplist, since the
1071                  * dictionary still has a reference to it. */
1072                 if (score != curscore) {
1073                     redisAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj));
1074                     znode = zslInsert(zs->zsl,score,curobj);
1075                     incrRefCount(curobj); /* Re-inserted in skiplist. */
1076                     dictGetVal(de) = &znode->score; /* Update score ptr. */
1077
1078                     signalModifiedKey(c->db,key);
1079                     server.dirty++;
1080                 }
1081             } else {
1082                 znode = zslInsert(zs->zsl,score,ele);
1083                 incrRefCount(ele); /* Inserted in skiplist. */
1084                 redisAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
1085                 incrRefCount(ele); /* Added to dictionary. */
1086
1087                 signalModifiedKey(c->db,key);
1088                 server.dirty++;
1089                 if (!incr) added++;
1090             }
1091         } else {
1092             redisPanic("Unknown sorted set encoding");
1093         }
1094     }
1095     zfree(scores);
1096     if (incr) /* ZINCRBY */
1097         addReplyDouble(c,score);
1098     else /* ZADD */
1099         addReplyLongLong(c,added);
1100 }
1101
1102 void zaddCommand(redisClient *c) {
1103     zaddGenericCommand(c,0);
1104 }
1105
1106 void zincrbyCommand(redisClient *c) {
1107     zaddGenericCommand(c,1);
1108 }
1109
1110 void zremCommand(redisClient *c) {
1111     robj *key = c->argv[1];
1112     robj *zobj;
1113     int deleted = 0, j;
1114
1115     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1116         checkType(c,zobj,REDIS_ZSET)) return;
1117
1118     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1119         unsigned char *eptr;
1120
1121         for (j = 2; j < c->argc; j++) {
1122             if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
1123                 deleted++;
1124                 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1125                 if (zzlLength(zobj->ptr) == 0) {
1126                     dbDelete(c->db,key);
1127                     break;
1128                 }
1129             }
1130         }
1131     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1132         zset *zs = zobj->ptr;
1133         dictEntry *de;
1134         double score;
1135
1136         for (j = 2; j < c->argc; j++) {
1137             de = dictFind(zs->dict,c->argv[j]);
1138             if (de != NULL) {
1139                 deleted++;
1140
1141                 /* Delete from the skiplist */
1142                 score = *(double*)dictGetVal(de);
1143                 redisAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j]));
1144
1145                 /* Delete from the hash table */
1146                 dictDelete(zs->dict,c->argv[j]);
1147                 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1148                 if (dictSize(zs->dict) == 0) {
1149                     dbDelete(c->db,key);
1150                     break;
1151                 }
1152             }
1153         }
1154     } else {
1155         redisPanic("Unknown sorted set encoding");
1156     }
1157
1158     if (deleted) {
1159         signalModifiedKey(c->db,key);
1160         server.dirty += deleted;
1161     }
1162     addReplyLongLong(c,deleted);
1163 }
1164
1165 void zremrangebyscoreCommand(redisClient *c) {
1166     robj *key = c->argv[1];
1167     robj *zobj;
1168     zrangespec range;
1169     unsigned long deleted;
1170
1171     /* Parse the range arguments. */
1172     if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
1173         addReplyError(c,"min or max is not a float");
1174         return;
1175     }
1176
1177     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1178         checkType(c,zobj,REDIS_ZSET)) return;
1179
1180     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1181         zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,range,&deleted);
1182         if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
1183     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1184         zset *zs = zobj->ptr;
1185         deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
1186         if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1187         if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1188     } else {
1189         redisPanic("Unknown sorted set encoding");
1190     }
1191
1192     if (deleted) signalModifiedKey(c->db,key);
1193     server.dirty += deleted;
1194     addReplyLongLong(c,deleted);
1195 }
1196
1197 void zremrangebyrankCommand(redisClient *c) {
1198     robj *key = c->argv[1];
1199     robj *zobj;
1200     long start;
1201     long end;
1202     int llen;
1203     unsigned long deleted;
1204
1205     if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1206         (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1207
1208     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1209         checkType(c,zobj,REDIS_ZSET)) return;
1210
1211     /* Sanitize indexes. */
1212     llen = zsetLength(zobj);
1213     if (start < 0) start = llen+start;
1214     if (end < 0) end = llen+end;
1215     if (start < 0) start = 0;
1216
1217     /* Invariant: start >= 0, so this test will be true when end < 0.
1218      * The range is empty when start > end or start >= length. */
1219     if (start > end || start >= llen) {
1220         addReply(c,shared.czero);
1221         return;
1222     }
1223     if (end >= llen) end = llen-1;
1224
1225     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1226         /* Correct for 1-based rank. */
1227         zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
1228         if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
1229     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1230         zset *zs = zobj->ptr;
1231
1232         /* Correct for 1-based rank. */
1233         deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1234         if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1235         if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1236     } else {
1237         redisPanic("Unknown sorted set encoding");
1238     }
1239
1240     if (deleted) signalModifiedKey(c->db,key);
1241     server.dirty += deleted;
1242     addReplyLongLong(c,deleted);
1243 }
1244
1245 typedef struct {
1246     robj *subject;
1247     int type; /* Set, sorted set */
1248     int encoding;
1249     double weight;
1250
1251     union {
1252         /* Set iterators. */
1253         union _iterset {
1254             struct {
1255                 intset *is;
1256                 int ii;
1257             } is;
1258             struct {
1259                 dict *dict;
1260                 dictIterator *di;
1261                 dictEntry *de;
1262             } ht;
1263         } set;
1264
1265         /* Sorted set iterators. */
1266         union _iterzset {
1267             struct {
1268                 unsigned char *zl;
1269                 unsigned char *eptr, *sptr;
1270             } zl;
1271             struct {
1272                 zset *zs;
1273                 zskiplistNode *node;
1274             } sl;
1275         } zset;
1276     } iter;
1277 } zsetopsrc;
1278
1279
1280 /* Use dirty flags for pointers that need to be cleaned up in the next
1281  * iteration over the zsetopval. The dirty flag for the long long value is
1282  * special, since long long values don't need cleanup. Instead, it means that
1283  * we already checked that "ell" holds a long long, or tried to convert another
1284  * representation into a long long value. When this was successful,
1285  * OPVAL_VALID_LL is set as well. */
1286 #define OPVAL_DIRTY_ROBJ 1
1287 #define OPVAL_DIRTY_LL 2
1288 #define OPVAL_VALID_LL 4
1289
1290 /* Store value retrieved from the iterator. */
1291 typedef struct {
1292     int flags;
1293     unsigned char _buf[32]; /* Private buffer. */
1294     robj *ele;
1295     unsigned char *estr;
1296     unsigned int elen;
1297     long long ell;
1298     double score;
1299 } zsetopval;
1300
1301 typedef union _iterset iterset;
1302 typedef union _iterzset iterzset;
1303
1304 void zuiInitIterator(zsetopsrc *op) {
1305     if (op->subject == NULL)
1306         return;
1307
1308     if (op->type == REDIS_SET) {
1309         iterset *it = &op->iter.set;
1310         if (op->encoding == REDIS_ENCODING_INTSET) {
1311             it->is.is = op->subject->ptr;
1312             it->is.ii = 0;
1313         } else if (op->encoding == REDIS_ENCODING_HT) {
1314             it->ht.dict = op->subject->ptr;
1315             it->ht.di = dictGetIterator(op->subject->ptr);
1316             it->ht.de = dictNext(it->ht.di);
1317         } else {
1318             redisPanic("Unknown set encoding");
1319         }
1320     } else if (op->type == REDIS_ZSET) {
1321         iterzset *it = &op->iter.zset;
1322         if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1323             it->zl.zl = op->subject->ptr;
1324             it->zl.eptr = ziplistIndex(it->zl.zl,0);
1325             if (it->zl.eptr != NULL) {
1326                 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
1327                 redisAssert(it->zl.sptr != NULL);
1328             }
1329         } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1330             it->sl.zs = op->subject->ptr;
1331             it->sl.node = it->sl.zs->zsl->header->level[0].forward;
1332         } else {
1333             redisPanic("Unknown sorted set encoding");
1334         }
1335     } else {
1336         redisPanic("Unsupported type");
1337     }
1338 }
1339
1340 void zuiClearIterator(zsetopsrc *op) {
1341     if (op->subject == NULL)
1342         return;
1343
1344     if (op->type == REDIS_SET) {
1345         iterset *it = &op->iter.set;
1346         if (op->encoding == REDIS_ENCODING_INTSET) {
1347             REDIS_NOTUSED(it); /* skip */
1348         } else if (op->encoding == REDIS_ENCODING_HT) {
1349             dictReleaseIterator(it->ht.di);
1350         } else {
1351             redisPanic("Unknown set encoding");
1352         }
1353     } else if (op->type == REDIS_ZSET) {
1354         iterzset *it = &op->iter.zset;
1355         if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1356             REDIS_NOTUSED(it); /* skip */
1357         } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1358             REDIS_NOTUSED(it); /* skip */
1359         } else {
1360             redisPanic("Unknown sorted set encoding");
1361         }
1362     } else {
1363         redisPanic("Unsupported type");
1364     }
1365 }
1366
1367 int zuiLength(zsetopsrc *op) {
1368     if (op->subject == NULL)
1369         return 0;
1370
1371     if (op->type == REDIS_SET) {
1372         iterset *it = &op->iter.set;
1373         if (op->encoding == REDIS_ENCODING_INTSET) {
1374             return intsetLen(it->is.is);
1375         } else if (op->encoding == REDIS_ENCODING_HT) {
1376             return dictSize(it->ht.dict);
1377         } else {
1378             redisPanic("Unknown set encoding");
1379         }
1380     } else if (op->type == REDIS_ZSET) {
1381         iterzset *it = &op->iter.zset;
1382         if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1383             return zzlLength(it->zl.zl);
1384         } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1385             return it->sl.zs->zsl->length;
1386         } else {
1387             redisPanic("Unknown sorted set encoding");
1388         }
1389     } else {
1390         redisPanic("Unsupported type");
1391     }
1392 }
1393
1394 /* Check if the current value is valid. If so, store it in the passed structure
1395  * and move to the next element. If not valid, this means we have reached the
1396  * end of the structure and can abort. */
1397 int zuiNext(zsetopsrc *op, zsetopval *val) {
1398     if (op->subject == NULL)
1399         return 0;
1400
1401     if (val->flags & OPVAL_DIRTY_ROBJ)
1402         decrRefCount(val->ele);
1403
1404     memset(val,0,sizeof(zsetopval));
1405
1406     if (op->type == REDIS_SET) {
1407         iterset *it = &op->iter.set;
1408         if (op->encoding == REDIS_ENCODING_INTSET) {
1409             int64_t ell;
1410
1411             if (!intsetGet(it->is.is,it->is.ii,&ell))
1412                 return 0;
1413             val->ell = ell;
1414             val->score = 1.0;
1415
1416             /* Move to next element. */
1417             it->is.ii++;
1418         } else if (op->encoding == REDIS_ENCODING_HT) {
1419             if (it->ht.de == NULL)
1420                 return 0;
1421             val->ele = dictGetKey(it->ht.de);
1422             val->score = 1.0;
1423
1424             /* Move to next element. */
1425             it->ht.de = dictNext(it->ht.di);
1426         } else {
1427             redisPanic("Unknown set encoding");
1428         }
1429     } else if (op->type == REDIS_ZSET) {
1430         iterzset *it = &op->iter.zset;
1431         if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1432             /* No need to check both, but better be explicit. */
1433             if (it->zl.eptr == NULL || it->zl.sptr == NULL)
1434                 return 0;
1435             redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
1436             val->score = zzlGetScore(it->zl.sptr);
1437
1438             /* Move to next element. */
1439             zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
1440         } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1441             if (it->sl.node == NULL)
1442                 return 0;
1443             val->ele = it->sl.node->obj;
1444             val->score = it->sl.node->score;
1445
1446             /* Move to next element. */
1447             it->sl.node = it->sl.node->level[0].forward;
1448         } else {
1449             redisPanic("Unknown sorted set encoding");
1450         }
1451     } else {
1452         redisPanic("Unsupported type");
1453     }
1454     return 1;
1455 }
1456
1457 int zuiLongLongFromValue(zsetopval *val) {
1458     if (!(val->flags & OPVAL_DIRTY_LL)) {
1459         val->flags |= OPVAL_DIRTY_LL;
1460
1461         if (val->ele != NULL) {
1462             if (val->ele->encoding == REDIS_ENCODING_INT) {
1463                 val->ell = (long)val->ele->ptr;
1464                 val->flags |= OPVAL_VALID_LL;
1465             } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1466                 if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
1467                     val->flags |= OPVAL_VALID_LL;
1468             } else {
1469                 redisPanic("Unsupported element encoding");
1470             }
1471         } else if (val->estr != NULL) {
1472             if (string2ll((char*)val->estr,val->elen,&val->ell))
1473                 val->flags |= OPVAL_VALID_LL;
1474         } else {
1475             /* The long long was already set, flag as valid. */
1476             val->flags |= OPVAL_VALID_LL;
1477         }
1478     }
1479     return val->flags & OPVAL_VALID_LL;
1480 }
1481
1482 robj *zuiObjectFromValue(zsetopval *val) {
1483     if (val->ele == NULL) {
1484         if (val->estr != NULL) {
1485             val->ele = createStringObject((char*)val->estr,val->elen);
1486         } else {
1487             val->ele = createStringObjectFromLongLong(val->ell);
1488         }
1489         val->flags |= OPVAL_DIRTY_ROBJ;
1490     }
1491     return val->ele;
1492 }
1493
1494 int zuiBufferFromValue(zsetopval *val) {
1495     if (val->estr == NULL) {
1496         if (val->ele != NULL) {
1497             if (val->ele->encoding == REDIS_ENCODING_INT) {
1498                 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
1499                 val->estr = val->_buf;
1500             } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1501                 val->elen = sdslen(val->ele->ptr);
1502                 val->estr = val->ele->ptr;
1503             } else {
1504                 redisPanic("Unsupported element encoding");
1505             }
1506         } else {
1507             val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
1508             val->estr = val->_buf;
1509         }
1510     }
1511     return 1;
1512 }
1513
1514 /* Find value pointed to by val in the source pointer to by op. When found,
1515  * return 1 and store its score in target. Return 0 otherwise. */
1516 int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
1517     if (op->subject == NULL)
1518         return 0;
1519
1520     if (op->type == REDIS_SET) {
1521         iterset *it = &op->iter.set;
1522
1523         if (op->encoding == REDIS_ENCODING_INTSET) {
1524             if (zuiLongLongFromValue(val) && intsetFind(it->is.is,val->ell)) {
1525                 *score = 1.0;
1526                 return 1;
1527             } else {
1528                 return 0;
1529             }
1530         } else if (op->encoding == REDIS_ENCODING_HT) {
1531             zuiObjectFromValue(val);
1532             if (dictFind(it->ht.dict,val->ele) != NULL) {
1533                 *score = 1.0;
1534                 return 1;
1535             } else {
1536                 return 0;
1537             }
1538         } else {
1539             redisPanic("Unknown set encoding");
1540         }
1541     } else if (op->type == REDIS_ZSET) {
1542         iterzset *it = &op->iter.zset;
1543         zuiObjectFromValue(val);
1544
1545         if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1546             if (zzlFind(it->zl.zl,val->ele,score) != NULL) {
1547                 /* Score is already set by zzlFind. */
1548                 return 1;
1549             } else {
1550                 return 0;
1551             }
1552         } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1553             dictEntry *de;
1554             if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) {
1555                 *score = *(double*)dictGetVal(de);
1556                 return 1;
1557             } else {
1558                 return 0;
1559             }
1560         } else {
1561             redisPanic("Unknown sorted set encoding");
1562         }
1563     } else {
1564         redisPanic("Unsupported type");
1565     }
1566 }
1567
1568 int zuiCompareByCardinality(const void *s1, const void *s2) {
1569     return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
1570 }
1571
1572 #define REDIS_AGGR_SUM 1
1573 #define REDIS_AGGR_MIN 2
1574 #define REDIS_AGGR_MAX 3
1575 #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
1576
1577 inline static void zunionInterAggregate(double *target, double val, int aggregate) {
1578     if (aggregate == REDIS_AGGR_SUM) {
1579         *target = *target + val;
1580         /* The result of adding two doubles is NaN when one variable
1581          * is +inf and the other is -inf. When these numbers are added,
1582          * we maintain the convention of the result being 0.0. */
1583         if (isnan(*target)) *target = 0.0;
1584     } else if (aggregate == REDIS_AGGR_MIN) {
1585         *target = val < *target ? val : *target;
1586     } else if (aggregate == REDIS_AGGR_MAX) {
1587         *target = val > *target ? val : *target;
1588     } else {
1589         /* safety net */
1590         redisPanic("Unknown ZUNION/INTER aggregate type");
1591     }
1592 }
1593
1594 void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
1595     int i, j;
1596     long setnum;
1597     int aggregate = REDIS_AGGR_SUM;
1598     zsetopsrc *src;
1599     zsetopval zval;
1600     robj *tmp;
1601     unsigned int maxelelen = 0;
1602     robj *dstobj;
1603     zset *dstzset;
1604     zskiplistNode *znode;
1605     int touched = 0;
1606
1607     /* expect setnum input keys to be given */
1608     if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != REDIS_OK))
1609         return;
1610
1611     if (setnum < 1) {
1612         addReplyError(c,
1613             "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
1614         return;
1615     }
1616
1617     /* test if the expected number of keys would overflow */
1618     if (setnum > c->argc-3) {
1619         addReply(c,shared.syntaxerr);
1620         return;
1621     }
1622
1623     /* read keys to be used for input */
1624     src = zcalloc(sizeof(zsetopsrc) * setnum);
1625     for (i = 0, j = 3; i < setnum; i++, j++) {
1626         robj *obj = lookupKeyWrite(c->db,c->argv[j]);
1627         if (obj != NULL) {
1628             if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
1629                 zfree(src);
1630                 addReply(c,shared.wrongtypeerr);
1631                 return;
1632             }
1633
1634             src.subject = obj;
1635             src.type = obj->type;
1636             src.encoding = obj->encoding;
1637         } else {
1638             src.subject = NULL;
1639         }
1640
1641         /* Default all weights to 1. */
1642         src.weight = 1.0;
1643     }
1644
1645     /* parse optional extra arguments */
1646     if (j < c->argc) {
1647         int remaining = c->argc - j;
1648
1649         while (remaining) {
1650             if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1651                 j++; remaining--;
1652                 for (i = 0; i < setnum; i++, j++, remaining--) {
1653                     if (getDoubleFromObjectOrReply(c,c->argv[j],&src.weight,
1654                             "weight value is not a float") != REDIS_OK)
1655                     {
1656                         zfree(src);
1657                         return;
1658                     }
1659                 }
1660             } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
1661                 j++; remaining--;
1662                 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
1663                     aggregate = REDIS_AGGR_SUM;
1664                 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
1665                     aggregate = REDIS_AGGR_MIN;
1666                 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
1667                     aggregate = REDIS_AGGR_MAX;
1668                 } else {
1669                     zfree(src);
1670                     addReply(c,shared.syntaxerr);
1671                     return;
1672                 }
1673                 j++; remaining--;
1674             } else {
1675                 zfree(src);
1676                 addReply(c,shared.syntaxerr);
1677                 return;
1678             }
1679         }
1680     }
1681
1682     for (i = 0; i < setnum; i++)
1683         zuiInitIterator(&src);
1684
1685     /* sort sets from the smallest to largest, this will improve our
1686      * algorithm's performance */
1687     qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
1688
1689     dstobj = createZsetObject();
1690     dstzset = dstobj->ptr;
1691     memset(&zval, 0, sizeof(zval));
1692
1693     if (op == REDIS_OP_INTER) {
1694         /* Skip everything if the smallest input is empty. */
1695         if (zuiLength(&src[0]) > 0) {
1696             /* Precondition: as src[0] is non-empty and the inputs are ordered
1697              * by size, all src[i > 0] are non-empty too. */
1698             while (zuiNext(&src[0],&zval)) {
1699                 double score, value;
1700
1701                 score = src[0].weight * zval.score;
1702                 if (isnan(score)) score = 0;
1703
1704                 for (j = 1; j < setnum; j++) {
1705                     /* It is not safe to access the zset we are
1706                      * iterating, so explicitly check for equal object. */
1707                     if (src[j].subject == src[0].subject) {
1708                         value = zval.score*src[j].weight;
1709                         zunionInterAggregate(&score,value,aggregate);
1710                     } else if (zuiFind(&src[j],&zval,&value)) {
1711                         value *= src[j].weight;
1712                         zunionInterAggregate(&score,value,aggregate);
1713                     } else {
1714                         break;
1715                     }
1716                 }
1717
1718                 /* Only continue when present in every input. */
1719                 if (j == setnum) {
1720                     tmp = zuiObjectFromValue(&zval);
1721                     znode = zslInsert(dstzset->zsl,score,tmp);
1722                     incrRefCount(tmp); /* added to skiplist */
1723                     dictAdd(dstzset->dict,tmp,&znode->score);
1724                     incrRefCount(tmp); /* added to dictionary */
1725
1726                     if (tmp->encoding == REDIS_ENCODING_RAW)
1727                         if (sdslen(tmp->ptr) > maxelelen)
1728                             maxelelen = sdslen(tmp->ptr);
1729                 }
1730             }
1731         }
1732     } else if (op == REDIS_OP_UNION) {
1733         for (i = 0; i < setnum; i++) {
1734             if (zuiLength(&src) == 0)
1735                 continue;
1736
1737             while (zuiNext(&src,&zval)) {
1738                 double score, value;
1739
1740                 /* Skip key when already processed */
1741                 if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
1742                     continue;
1743
1744                 /* Initialize score */
1745                 score = src.weight * zval.score;
1746                 if (isnan(score)) score = 0;
1747
1748                 /* Because the inputs are sorted by size, it's only possible
1749                  * for sets at larger indices to hold this element. */
1750                 for (j = (i+1); j < setnum; j++) {
1751                     /* It is not safe to access the zset we are
1752                      * iterating, so explicitly check for equal object. */
1753                     if(src[j].subject == src.subject) {
1754                         value = zval.score*src[j].weight;
1755                         zunionInterAggregate(&score,value,aggregate);
1756                     } else if (zuiFind(&src[j],&zval,&value)) {
1757                         value *= src[j].weight;
1758                         zunionInterAggregate(&score,value,aggregate);
1759                     }
1760                 }
1761
1762                 tmp = zuiObjectFromValue(&zval);
1763                 znode = zslInsert(dstzset->zsl,score,tmp);
1764                 incrRefCount(zval.ele); /* added to skiplist */
1765                 dictAdd(dstzset->dict,tmp,&znode->score);
1766                 incrRefCount(zval.ele); /* added to dictionary */
1767
1768                 if (tmp->encoding == REDIS_ENCODING_RAW)
1769                     if (sdslen(tmp->ptr) > maxelelen)
1770                         maxelelen = sdslen(tmp->ptr);
1771             }
1772         }
1773     } else {
1774         redisPanic("Unknown operator");
1775     }
1776
1777     for (i = 0; i < setnum; i++)
1778         zuiClearIterator(&src);
1779
1780     if (dbDelete(c->db,dstkey)) {
1781         signalModifiedKey(c->db,dstkey);
1782         touched = 1;
1783         server.dirty++;
1784     }
1785     if (dstzset->zsl->length) {
1786         /* Convert to ziplist when in limits. */
1787         if (dstzset->zsl->length db,dstkey);
1794         server.dirty++;
1795     } else {
1796         decrRefCount(dstobj);
1797         addReply(c,shared.czero);
1798     }
1799     zfree(src);
1800 }
1801
1802 void zunionstoreCommand(redisClient *c) {
1803     zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
1804 }
1805
1806 void zinterstoreCommand(redisClient *c) {
1807     zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
1808 }
1809
1810 void zrangeGenericCommand(redisClient *c, int reverse) {
1811     robj *key = c->argv[1];
1812     robj *zobj;
1813     int withscores = 0;
1814     long start;
1815     long end;
1816     int llen;
1817     int rangelen;
1818
1819     if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1820         (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1821
1822     if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
1823         withscores = 1;
1824     } else if (c->argc >= 5) {
1825         addReply(c,shared.syntaxerr);
1826         return;
1827     }
1828
1829     if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
1830          || checkType(c,zobj,REDIS_ZSET)) return;
1831
1832     /* Sanitize indexes. */
1833     llen = zsetLength(zobj);
1834     if (start < 0) start = llen+start;
1835     if (end < 0) end = llen+end;
1836     if (start < 0) start = 0;
1837
1838     /* Invariant: start >= 0, so this test will be true when end < 0.
1839      * The range is empty when start > end or start >= length. */
1840     if (start > end || start >= llen) {
1841         addReply(c,shared.emptymultibulk);
1842         return;
1843     }
1844     if (end >= llen) end = llen-1;
1845     rangelen = (end-start)+1;
1846
1847     /* Return the result in form of a multi-bulk reply */
1848     addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
1849
1850     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1851         unsigned char *zl = zobj->ptr;
1852         unsigned char *eptr, *sptr;
1853         unsigned char *vstr;
1854         unsigned int vlen;
1855         long long vlong;
1856
1857         if (reverse)
1858             eptr = ziplistIndex(zl,-2-(2*start));
1859         else
1860             eptr = ziplistIndex(zl,2*start);
1861
1862         redisAssertWithInfo(c,zobj,eptr != NULL);
1863         sptr = ziplistNext(zl,eptr);
1864
1865         while (rangelen--) {
1866             redisAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
1867             redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
1868             if (vstr == NULL)
1869                 addReplyBulkLongLong(c,vlong);
1870             else
1871                 addReplyBulkCBuffer(c,vstr,vlen);
1872
1873             if (withscores)
1874                 addReplyDouble(c,zzlGetScore(sptr));
1875
1876             if (reverse)
1877                 zzlPrev(zl,&eptr,&sptr);
1878             else
1879                 zzlNext(zl,&eptr,&sptr);
1880         }
1881
1882     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1883         zset *zs = zobj->ptr;
1884         zskiplist *zsl = zs->zsl;
1885         zskiplistNode *ln;
1886         robj *ele;
1887
1888         /* Check if starting point is trivial, before doing log(N) lookup. */
1889         if (reverse) {
1890             ln = zsl->tail;
1891             if (start > 0)
1892                 ln = zslGetElementByRank(zsl,llen-start);
1893         } else {
1894             ln = zsl->header->level[0].forward;
1895             if (start > 0)
1896                 ln = zslGetElementByRank(zsl,start+1);
1897         }
1898
1899         while(rangelen--) {
1900             redisAssertWithInfo(c,zobj,ln != NULL);
1901             ele = ln->obj;
1902             addReplyBulk(c,ele);
1903             if (withscores)
1904                 addReplyDouble(c,ln->score);
1905             ln = reverse ? ln->backward : ln->level[0].forward;
1906         }
1907     } else {
1908         redisPanic("Unknown sorted set encoding");
1909     }
1910 }
1911
1912 void zrangeCommand(redisClient *c) {
1913     zrangeGenericCommand(c,0);
1914 }
1915
1916 void zrevrangeCommand(redisClient *c) {
1917     zrangeGenericCommand(c,1);
1918 }
1919
1920 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
1921 void genericZrangebyscoreCommand(redisClient *c, int reverse) {
1922     zrangespec range;
1923     robj *key = c->argv[1];
1924     robj *zobj;
1925     long offset = 0, limit = -1;
1926     int withscores = 0;
1927     unsigned long rangelen = 0;
1928     void *replylen = NULL;
1929     int minidx, maxidx;
1930
1931     /* Parse the range arguments. */
1932     if (reverse) {
1933         /* Range is given as [max,min] */
1934         maxidx = 2; minidx = 3;
1935     } else {
1936         /* Range is given as [min,max] */
1937         minidx = 2; maxidx = 3;
1938     }
1939
1940     if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
1941         addReplyError(c,"min or max is not a float");
1942         return;
1943     }
1944
1945     /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1946      * 4 arguments, so we'll never enter the following code path. */
1947     if (c->argc > 4) {
1948         int remaining = c->argc - 4;
1949         int pos = 4;
1950
1951         while (remaining) {
1952             if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
1953                 pos++; remaining--;
1954                 withscores = 1;
1955             } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
1956                 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != REDIS_OK) ||
1957                     (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != REDIS_OK)) return;
1958                 pos += 3; remaining -= 3;
1959             } else {
1960                 addReply(c,shared.syntaxerr);
1961                 return;
1962             }
1963         }
1964     }
1965
1966     /* Ok, lookup the key and get the range */
1967     if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
1968         checkType(c,zobj,REDIS_ZSET)) return;
1969
1970     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1971         unsigned char *zl = zobj->ptr;
1972         unsigned char *eptr, *sptr;
1973         unsigned char *vstr;
1974         unsigned int vlen;
1975         long long vlong;
1976         double score;
1977
1978         /* If reversed, get the last node in range as starting point. */
1979         if (reverse) {
1980             eptr = zzlLastInRange(zl,range);
1981         } else {
1982             eptr = zzlFirstInRange(zl,range);
1983         }
1984
1985         /* No "first" element in the specified interval. */
1986         if (eptr == NULL) {
1987             addReply(c, shared.emptymultibulk);
1988             return;
1989         }
1990
1991         /* Get score pointer for the first element. */
1992         redisAssertWithInfo(c,zobj,eptr != NULL);
1993         sptr = ziplistNext(zl,eptr);
1994
1995         /* We don't know in advance how many matching elements there are in the
1996          * list, so we push this object that will represent the multi-bulk
1997          * length in the output buffer, and will "fix" it later */
1998         replylen = addDeferredMultiBulkLength(c);
1999
2000         /* If there is an offset, just traverse the number of elements without
2001          * checking the score because that is done in the next loop. */
2002         while (eptr && offset--) {
2003             if (reverse) {
2004                 zzlPrev(zl,&eptr,&sptr);
2005             } else {
2006                 zzlNext(zl,&eptr,&sptr);
2007             }
2008         }
2009
2010         while (eptr && limit--) {
2011             score = zzlGetScore(sptr);
2012
2013             /* Abort when the node is no longer in range. */
2014             if (reverse) {
2015                 if (!zslValueGteMin(score,&range)) break;
2016             } else {
2017                 if (!zslValueLteMax(score,&range)) break;
2018             }
2019
2020             /* We know the element exists, so ziplistGet should always succeed */
2021             redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
2022
2023             rangelen++;
2024             if (vstr == NULL) {
2025                 addReplyBulkLongLong(c,vlong);
2026             } else {
2027                 addReplyBulkCBuffer(c,vstr,vlen);
2028             }
2029
2030             if (withscores) {
2031                 addReplyDouble(c,score);
2032             }
2033
2034             /* Move to next node */
2035             if (reverse) {
2036                 zzlPrev(zl,&eptr,&sptr);
2037             } else {
2038                 zzlNext(zl,&eptr,&sptr);
2039             }
2040         }
2041     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2042         zset *zs = zobj->ptr;
2043         zskiplist *zsl = zs->zsl;
2044         zskiplistNode *ln;
2045
2046         /* If reversed, get the last node in range as starting point. */
2047         if (reverse) {
2048             ln = zslLastInRange(zsl,range);
2049         } else {
2050             ln = zslFirstInRange(zsl,range);
2051         }
2052
2053         /* No "first" element in the specified interval. */
2054         if (ln == NULL) {
2055             addReply(c, shared.emptymultibulk);
2056             return;
2057         }
2058
2059         /* We don't know in advance how many matching elements there are in the
2060          * list, so we push this object that will represent the multi-bulk
2061          * length in the output buffer, and will "fix" it later */
2062         replylen = addDeferredMultiBulkLength(c);
2063
2064         /* If there is an offset, just traverse the number of elements without
2065          * checking the score because that is done in the next loop. */
2066         while (ln && offset--) {
2067             if (reverse) {
2068                 ln = ln->backward;
2069             } else {
2070                 ln = ln->level[0].forward;
2071             }
2072         }
2073
2074         while (ln && limit--) {
2075             /* Abort when the node is no longer in range. */
2076             if (reverse) {
2077                 if (!zslValueGteMin(ln->score,&range)) break;
2078             } else {
2079                 if (!zslValueLteMax(ln->score,&range)) break;
2080             }
2081
2082             rangelen++;
2083             addReplyBulk(c,ln->obj);
2084
2085             if (withscores) {
2086                 addReplyDouble(c,ln->score);
2087             }
2088
2089             /* Move to next node */
2090             if (reverse) {
2091                 ln = ln->backward;
2092             } else {
2093                 ln = ln->level[0].forward;
2094             }
2095         }
2096     } else {
2097         redisPanic("Unknown sorted set encoding");
2098     }
2099
2100     if (withscores) {
2101         rangelen *= 2;
2102     }
2103
2104     setDeferredMultiBulkLength(c, replylen, rangelen);
2105 }
2106
2107 void zrangebyscoreCommand(redisClient *c) {
2108     genericZrangebyscoreCommand(c,0);
2109 }
2110
2111 void zrevrangebyscoreCommand(redisClient *c) {
2112     genericZrangebyscoreCommand(c,1);
2113 }
2114
2115 void zcountCommand(redisClient *c) {
2116     robj *key = c->argv[1];
2117     robj *zobj;
2118     zrangespec range;
2119     int count = 0;
2120
2121     /* Parse the range arguments */
2122     if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
2123         addReplyError(c,"min or max is not a float");
2124         return;
2125     }
2126
2127     /* Lookup the sorted set */
2128     if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
2129         checkType(c, zobj, REDIS_ZSET)) return;
2130
2131     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
2132         unsigned char *zl = zobj->ptr;
2133         unsigned char *eptr, *sptr;
2134         double score;
2135
2136         /* Use the first element in range as the starting point */
2137         eptr = zzlFirstInRange(zl,range);
2138
2139         /* No "first" element */
2140         if (eptr == NULL) {
2141             addReply(c, shared.czero);
2142             return;
2143         }
2144
2145         /* First element is in range */
2146         sptr = ziplistNext(zl,eptr);
2147         score = zzlGetScore(sptr);
2148         redisAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
2149
2150         /* Iterate over elements in range */
2151         while (eptr) {
2152             score = zzlGetScore(sptr);
2153
2154             /* Abort when the node is no longer in range. */
2155             if (!zslValueLteMax(score,&range)) {
2156                 break;
2157             } else {
2158                 count++;
2159                 zzlNext(zl,&eptr,&sptr);
2160             }
2161         }
2162     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2163         zset *zs = zobj->ptr;
2164         zskiplist *zsl = zs->zsl;
2165         zskiplistNode *zn;
2166         unsigned long rank;
2167
2168         /* Find first element in range */
2169         zn = zslFirstInRange(zsl, range);
2170
2171         /* Use rank of first element, if any, to determine preliminary count */
2172         if (zn != NULL) {
2173             rank = zslGetRank(zsl, zn->score, zn->obj);
2174             count = (zsl->length - (rank - 1));
2175
2176             /* Find last element in range */
2177             zn = zslLastInRange(zsl, range);
2178
2179             /* Use rank of last element, if any, to determine the actual count */
2180             if (zn != NULL) {
2181                 rank = zslGetRank(zsl, zn->score, zn->obj);
2182                 count -= (zsl->length - rank);
2183             }
2184         }
2185     } else {
2186         redisPanic("Unknown sorted set encoding");
2187     }
2188
2189     addReplyLongLong(c, count);
2190 }
2191
2192 void zcardCommand(redisClient *c) {
2193     robj *key = c->argv[1];
2194     robj *zobj;
2195
2196     if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
2197         checkType(c,zobj,REDIS_ZSET)) return;
2198
2199     addReplyLongLong(c,zsetLength(zobj));
2200 }
2201
2202 void zscoreCommand(redisClient *c) {
2203     robj *key = c->argv[1];
2204     robj *zobj;
2205     double score;
2206
2207     if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
2208         checkType(c,zobj,REDIS_ZSET)) return;
2209
2210     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
2211         if (zzlFind(zobj->ptr,c->argv[2],&score) != NULL)
2212             addReplyDouble(c,score);
2213         else
2214             addReply(c,shared.nullbulk);
2215     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2216         zset *zs = zobj->ptr;
2217         dictEntry *de;
2218
2219         c->argv[2] = tryObjectEncoding(c->argv[2]);
2220         de = dictFind(zs->dict,c->argv[2]);
2221         if (de != NULL) {
2222             score = *(double*)dictGetVal(de);
2223             addReplyDouble(c,score);
2224         } else {
2225             addReply(c,shared.nullbulk);
2226         }
2227     } else {
2228         redisPanic("Unknown sorted set encoding");
2229     }
2230 }
2231
2232 void zrankGenericCommand(redisClient *c, int reverse) {
2233     robj *key = c->argv[1];
2234     robj *ele = c->argv[2];
2235     robj *zobj;
2236     unsigned long llen;
2237     unsigned long rank;
2238
2239     if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
2240         checkType(c,zobj,REDIS_ZSET)) return;
2241     llen = zsetLength(zobj);
2242
2243     redisAssertWithInfo(c,ele,ele->encoding == REDIS_ENCODING_RAW);
2244     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
2245         unsigned char *zl = zobj->ptr;
2246         unsigned char *eptr, *sptr;
2247
2248         eptr = ziplistIndex(zl,0);
2249         redisAssertWithInfo(c,zobj,eptr != NULL);
2250         sptr = ziplistNext(zl,eptr);
2251         redisAssertWithInfo(c,zobj,sptr != NULL);
2252
2253         rank = 1;
2254         while(eptr != NULL) {
2255             if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
2256                 break;
2257             rank++;
2258             zzlNext(zl,&eptr,&sptr);
2259         }
2260
2261         if (eptr != NULL) {
2262             if (reverse)
2263                 addReplyLongLong(c,llen-rank);
2264             else
2265                 addReplyLongLong(c,rank-1);
2266         } else {
2267             addReply(c,shared.nullbulk);
2268         }
2269     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2270         zset *zs = zobj->ptr;
2271         zskiplist *zsl = zs->zsl;
2272         dictEntry *de;
2273         double score;
2274
2275         ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
2276         de = dictFind(zs->dict,ele);
2277         if (de != NULL) {
2278             score = *(double*)dictGetVal(de);
2279             rank = zslGetRank(zsl,score,ele);
2280             redisAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */
2281             if (reverse)
2282                 addReplyLongLong(c,llen-rank);
2283             else
2284                 addReplyLongLong(c,rank-1);
2285         } else {
2286             addReply(c,shared.nullbulk);
2287         }
2288     } else {
2289         redisPanic("Unknown sorted set encoding");
2290     }
2291 }
2292
2293 void zrankCommand(redisClient *c) {
2294     zrankGenericCommand(c, 0);
2295 }
2296
2297 void zrevrankCommand(redisClient *c) {
2298     zrankGenericCommand(c, 1);
2299 }
t_zset.c  参考文献: http://kenby.iteye.com/blog/1187303
  http://blog.iyunv.com/acceptedxukai/article/details/17333673
  http://www.iyunv.com/xuqiang/archive/2011/05/22/2053516.html
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-89743-1-1.html 上篇帖子: redis连接池 下篇帖子: Redis C客户端API
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表