|
1 /* ZSETs use a specialized version of Skiplists */
2 typedef struct zskiplistNode {
3
4 /* member对象 */
5 robj *obj;
6
7 /* 分值 */
8 double score;
9
10 /* 后退指针 */
11 struct zskiplistNode *backward;
12
13 /* 层 */
14 struct zskiplistLevel {
15 /* 前进指针 */
16 struct zskiplistNode *forward;
17
18 /* 这个层跨越的节点数量 */
19 unsigned int span;
20 } level[];
21 } zskiplistNode; /* 跳跃表节点 */
22
23 typedef struct zskiplist {
24 /* 头节点,尾节点 */
25 struct zskiplistNode *header, *tail;
26
27 /* 节点数量 */
28 unsigned long length;
29
30 /* 目前表内节点的最大层数 */
31 int level;
32 } zskiplist; /* 跳跃表 */
redis.h
1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo
3 * Copyright (c) 2009-2012, Pieter Noordhuis
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 /*-----------------------------------------------------------------------------
32 * Sorted set API
33 *----------------------------------------------------------------------------*/
34
35 /* ZSETs are ordered sets using two data structures to hold the same elements
36 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
37 * data structure.
38 *
39 * The elements are added to an hash table mapping Redis objects to scores.
40 * At the same time the elements are added to a skip list mapping scores
41 * to Redis objects (so objects are sorted by scores in this "view"). */
42
43 /* This skiplist implementation is almost a C translation of the original
44 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
45 * Alternative to Balanced Trees", modified in three ways:
46 * a) this implementation allows for repeated scores.
47 * b) the comparison is not just by key (our 'score') but by satellite data.
48 * c) there is a back pointer, so it's a doubly linked list with the back
49 * pointers being only at "level 1". This allows to traverse the list
50 * from tail to head, useful for ZREVRANGE. */
51
52 #include "redis.h"
53 #include
54
55 /* 创建并返回一个新的跳跃表节点 */
56 zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
57 /* 分配层 */
58 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
59
60 /* 更新属性 */
61 zn->score = score;
62 zn->obj = obj;
63
64 return zn;
65 }
66
67 /* 创建并初始化一个新的跳跃表 */
68 zskiplist *zslCreate(void) {
69 int j;
70 zskiplist *zsl;
71
72 /* 分配空间 */
73 zsl = zmalloc(sizeof(*zsl));
74
75 /* 更新属性 */
76 zsl->level = 1;
77 zsl->length = 0;
78
79 /* 初始化头节点 */
80 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
81
82 /* 初始化层指针 */
83 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
84 zsl->header->level[j].forward = NULL; /* 前进指针 */
85 zsl->header->level[j].span = 0;
86 }
87 zsl->header->backward = NULL; /* 后退指针 */
88 zsl->tail = NULL;
89
90 return zsl;
91 }
92
93 /* 释放给定的跳跃表节点 */
94 void zslFreeNode(zskiplistNode *node) {
95 decrRefCount(node->obj);
96 zfree(node);
97 }
98
99 /* 释放整个跳跃表 */
100 void zslFree(zskiplist *zsl) {
101
102 zskiplistNode *node = zsl->header->level[0].forward, *next;
103
104 zfree(zsl->header);
105
106 /* 遍历删除 */
107 while(node) {
108 next = node->level[0].forward;
109 zslFreeNode(node);
110 node = next;
111 }
112
113 zfree(zsl);
114 }
115
116 /* Returns a random level for the new skiplist node we are going to create.
117 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
118 * (both inclusive), with a powerlaw-alike distribution where higher
119 * levels are less likely to be returned. */
120 /* 返回一个介于1和ASKIPLIST_MAXLEVEL之间的随机值,作为节点的层数。*/
121 /* 根据幂次定律(power law),数值越大,函数生成它的几率就越小 */
122 int zslRandomLevel(void) {
123 int level = 1;
124 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
125 level += 1;
126 return (levelheader;
142
143 /* 记录沿途访问的节点,并计数span等属性 */
144 for (i = zsl->level-1; i >= 0; i--) {
145 /* store rank that is crossed to reach the insert position */
146 rank = i == (zsl->level-1) ? 0 : rank[i+1];
147
148 /* 右节点不为空 */
149 while (x->level.forward &&
150 /* 右节点的score比给定score小 */
151 (x->level.forward->score < score ||
152 /* 右节点的score相同,但节点的member比输入的member要小 */
153 (x->level.forward->score == score &&
154 compareStringObjects(x->level.forward->obj,obj) < 0))) {
155
156 /* 记录跨越多少元素 */
157 rank += x->level.span;
158
159 /* 继续向右前进 */
160 x = x->level.forward;
161 }
162 /* 保存访问节点 */
163 update = x;
164 }
165 /* we assume the key is not already inside, since we allow duplicated
166 * scores, and the re-insertion of score and redis object should never
167 * happen since the caller of zslInsert() should test in the hash table
168 * if the element is already inside or not. */
169 /* 因为整个函数不可能处理两个元素的member和score都相同的情况 */
170 /* 所以直接创建新节点,不用检查存在性 */
171
172 /* 计算新的随机层数 */
173 level = zslRandomLevel();
174
175 /*
176 * 如果level比当前跳跃表的最大层还要大
177 * 那么更新zsl->level参数
178 * 并且初始化update和rank参数在相应层的数据
179 */
180 if (level > zsl->level) {
181 for (i = zsl->level; i < level; i++) {
182 rank = 0;
183 update = zsl->header;
184 update->level.span = zsl->length;
185 }
186 zsl->level = level;
187 }
188
189 /* 创建新节点 */
190 x = zslCreateNode(level,score,obj);
191
192 /* 根据update和rank两个数组的资料,初始化新节点并设置相应的指针 */
193 for (i = 0; i < level; i++) {
194
195 /* 设置指针 */
196 x->level.forward = update->level.forward;
197 update->level.forward = x;
198
199 /* update span covered by update as x is inserted here */
200 /* 设置span */
201 x->level.span = update->level.span - (rank[0] - rank);
202 update->level.span = (rank[0] - rank) + 1;
203 }
204
205 /* increment span for untouched levels */
206 /* 更新给定沿途访问节点的span */
207 for (i = level; i < zsl->level; i++) {
208 update->level.span++;
209 }
210
211 /* 设置后退指针 */
212 x->backward = (update[0] == zsl->header) ? NULL : update[0];
213
214 /* 设置x的前进指针 */
215 if (x->level[0].forward)
216 x->level[0].forward->backward = x;
217 else
218 /* 这个时新的表尾节点 */
219 zsl->tail = x;
220
221 /* 更新跳跃表节点数量 */
222 zsl->length++;
223
224 return x;
225 }
226
227 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
228 /* 删除给定的跳跃表节点 */
229 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
230 int i;
231
232 /* 修改相应的指针和span */
233 for (i = 0; i < zsl->level; i++) {
234 if (update->level.forward == x) {
235 update->level.span += x->level.span - 1;
236 update->level.forward = x->level.forward;
237 } else {
238 update->level.span -= 1;
239 }
240 }
241
242 /* 处理表头和表尾节点 */
243 if (x->level[0].forward) {
244 x->level[0].forward->backward = x->backward;
245 } else {
246 zsl->tail = x->backward;
247 }
248
249 /* 收缩level的值 */
250 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
251 zsl->level--;
252
253 zsl->length--;
254 }
255
256 /* Delete an element with matching score/object from the skiplist. */
257 /* 从跳跃表中删除和给定obj以及给定score匹配的元素 */
258 int zslDelete(zskiplist *zsl, double score, robj *obj) {
259 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
260 int i;
261
262 x = zsl->header;
263 /* 遍历所有层,记录删除节点后需要被修改的节点到update数组 */
264 for (i = zsl->level-1; i >= 0; i--) {
265 while (x->level.forward &&
266 (x->level.forward->score < score ||
267 (x->level.forward->score == score &&
268 compareStringObjects(x->level.forward->obj,obj) < 0)))
269 x = x->level.forward;
270 update = x;
271 }
272 /* We may have multiple elements with the same score, what we need
273 * is to find the element with both the right score and object. */
274 /* 因为多个不同的member可能由相同的score */
275 /* 所以要确保x的member和score都匹配时,才进行删除 */
276 x = x->level[0].forward;
277 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
278 zslDeleteNode(zsl, x, update);
279 zslFreeNode(x);
280 return 1;
281 } else {
282 return 0; /* not found */
283 }
284 return 0; /* not found */
285 }
286
287 /* 检查value是否属于spec指定的范围内 */
288 static int zslValueGteMin(double value, zrangespec *spec) {
289 return spec->minex ? (value > spec->min) : (value >= spec->min);
290 }
291
292 /* 检查value是否属于spec指定的范围内 */
293 static int zslValueLteMax(double value, zrangespec *spec) {
294 return spec->maxex ? (value < spec->max) : (value max);
295 }
296
297 /* Returns if there is a part of the zset is in range. */
298 /* 检查zset中的元素是否在给定范围之内 */
299 int zslIsInRange(zskiplist *zsl, zrangespec *range) {
300 zskiplistNode *x;
301
302 /* Test for ranges that will always be empty. */
303 if (range->min > range->max ||
304 (range->min == range->max && (range->minex || range->maxex)))
305 return 0;
306
307 /* 如果zset的最大节点的score比范围的最小值要小 */
308 /* 那么zset不在范围之内 */
309 x = zsl->tail;
310 if (x == NULL || !zslValueGteMin(x->score,range))
311 return 0;
312
313 /* 如果zset的最小节点的score比范围的最大值要大 */
314 /* 那么zset不在范围之内 */
315 x = zsl->header->level[0].forward;
316 if (x == NULL || !zslValueLteMax(x->score,range))
317 return 0;
318
319 /* 在范围内 */
320 return 1;
321 }
322
323 /* Find the first node that is contained in the specified range.
324 * Returns NULL when no element is contained in the range. */
325 /* 找到跳跃表中的第一个符合给定范围的元素 */
326 zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
327 zskiplistNode *x;
328 int i;
329
330 /* If everything is out of range, return early. */
331 if (!zslIsInRange(zsl,&range)) return NULL;
332
333 /* 找到第一个score值大于给定范围最小值的节点 */
334 x = zsl->header;
335 for (i = zsl->level-1; i >= 0; i--) {
336 /* Go forward while *OUT* of range. */
337 while (x->level.forward &&
338 !zslValueGteMin(x->level.forward->score,&range))
339 x = x->level.forward;
340 }
341
342 /* This is an inner range, so the next node cannot be NULL. */
343 x = x->level[0].forward;
344 redisAssert(x != NULL);
345
346 /* Check if score score,&range)) return NULL;
348 return x;
349 }
350
351 /* Find the last node that is contained in the specified range.
352 * Returns NULL when no element is contained in the range. */
353 /* 找到跳跃表中最后一个符合给定范围的元素 */
354 zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
355 zskiplistNode *x;
356 int i;
357
358 /* If everything is out of range, return early. */
359 if (!zslIsInRange(zsl,&range)) return NULL;
360
361 x = zsl->header;
362 for (i = zsl->level-1; i >= 0; i--) {
363 /* Go forward while *IN* range. */
364 while (x->level.forward &&
365 zslValueLteMax(x->level.forward->score,&range))
366 x = x->level.forward;
367 }
368
369 /* This is an inner range, so this node cannot be NULL. */
370 redisAssert(x != NULL);
371
372 /* Check if score >= min. */
373 if (!zslValueGteMin(x->score,&range)) return NULL;
374 return x;
375 }
376
377 /* Delete all the elements with score between min and max from the skiplist.
378 * Min and max are inclusive, so a score >= min || score header;
389 for (i = zsl->level-1; i >= 0; i--) {
390 while (x->level.forward && (range.minex ?
391 x->level.forward->score level.forward->score < range.min))
393 x = x->level.forward;
394 update = x;
395 }
396
397 /* Current node is the last with score < or level[0].forward;
399
400 /* Delete nodes while in range. */
401 /* 一直向右删除,直到到达range的底为止 */
402 while (x && (range.maxex ? x->score < range.max : x->score level[0].forward;
405
406 /* 在跳跃表中删除 */
407 zslDeleteNode(zsl,x,update);
408
409 /* 在字典中删除 */
410 dictDelete(dict,x->obj);
411
412 /* 释放 */
413 zslFreeNode(x);
414
415 removed++;
416
417 x = next;
418 }
419
420 return removed;
421 }
422
423 /* Delete all the elements with rank between start and end from the skiplist.
424 * Start and end are inclusive. Note that start and end need to be 1-based */
425 /* 删除给定排序范围内的所有节点 */
426 unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
427 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
428 unsigned long traversed = 0, removed = 0;
429 int i;
430
431 /* 通过计算rank,移动到删除开始的地方 */
432 x = zsl->header;
433 for (i = zsl->level-1; i >= 0; i--) {
434 while (x->level.forward && (traversed + x->level.span) < start) {
435 traversed += x->level.span;
436 x = x->level.forward;
437 }
438 update = x;
439 }
440
441 /* 算上start节点 */
442 traversed++;
443
444 /* 从start开始,删除直到到达索引end,或者末尾 */
445 x = x->level[0].forward;
446
447 while (x && traversed level[0].forward;
450
451 /* 删除skiplist节点 */
452 zslDeleteNode(zsl,x,update);
453
454 /* 删除dict节点 */
455 dictDelete(dict,x->obj);
456
457 /* 删除节点 */
458 zslFreeNode(x);
459
460 /* 删除计数 */
461 removed++;
462 traversed++;
463 x = next;
464 }
465 return removed;
466 }
467
468 /* Find the rank for an element by both score and key.
469 * Returns 0 when the element cannot be found, rank otherwise.
470 * Note that the rank is 1-based due to the span of zsl->header to the
471 * first element. */
472 unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
473 zskiplistNode *x;
474 unsigned long rank = 0;
475 int i;
476
477 x = zsl->header;
478 for (i = zsl->level-1; i >= 0; i--) {
479 while (x->level.forward &&
480 (x->level.forward->score < score ||
481 (x->level.forward->score == score &&
482 compareStringObjects(x->level.forward->obj,o) level.span;
484 x = x->level.forward;
485 }
486
487 /* x might be equal to zsl->header, so test if obj is non-NULL */
488 if (x->obj && equalStringObjects(x->obj,o)) {
489 return rank;
490 }
491 }
492 return 0;
493 }
494
495 /* Finds an element by its rank. The rank argument needs to be 1-based. */
496 zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
497 zskiplistNode *x;
498 unsigned long traversed = 0;
499 int i;
500
501 x = zsl->header;
502 for (i = zsl->level-1; i >= 0; i--) {
503 while (x->level.forward && (traversed + x->level.span) level.span;
506 x = x->level.forward;
507 }
508 if (traversed == rank) {
509 return x;
510 }
511 }
512 return NULL;
513 }
514
515 /* Populate the rangespec according to the objects min and max. */
516 static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
517 char *eptr;
518 spec->minex = spec->maxex = 0;
519
520 /* Parse the min-max interval. If one of the values is prefixed
521 * by the "(" character, it's considered "open". For instance
522 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
523 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min min = (long)min->ptr;
526 } else {
527 if (((char*)min->ptr)[0] == '(') {
528 spec->min = strtod((char*)min->ptr+1,&eptr);
529 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
530 spec->minex = 1;
531 } else {
532 spec->min = strtod((char*)min->ptr,&eptr);
533 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
534 }
535 }
536 if (max->encoding == REDIS_ENCODING_INT) {
537 spec->max = (long)max->ptr;
538 } else {
539 if (((char*)max->ptr)[0] == '(') {
540 spec->max = strtod((char*)max->ptr+1,&eptr);
541 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
542 spec->maxex = 1;
543 } else {
544 spec->max = strtod((char*)max->ptr,&eptr);
545 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
546 }
547 }
548
549 return REDIS_OK;
550 }
551
552 /*-----------------------------------------------------------------------------
553 * Ziplist-backed sorted set API
554 *----------------------------------------------------------------------------*/
555
556 double zzlGetScore(unsigned char *sptr) {
557 unsigned char *vstr;
558 unsigned int vlen;
559 long long vlong;
560 char buf[128];
561 double score;
562
563 redisAssert(sptr != NULL);
564 redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
565
566 if (vstr) {
567 memcpy(buf,vstr,vlen);
568 buf[vlen] = '\0';
569 score = strtod(buf,NULL);
570 } else {
571 score = vlong;
572 }
573
574 return score;
575 }
576
577 /* Compare element in sorted set with given element. */
578 int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
579 unsigned char *vstr;
580 unsigned int vlen;
581 long long vlong;
582 unsigned char vbuf[32];
583 int minlen, cmp;
584
585 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
586 if (vstr == NULL) {
587 /* Store string representation of long long in buf. */
588 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
589 vstr = vbuf;
590 }
591
592 minlen = (vlen < clen) ? vlen : clen;
593 cmp = memcmp(vstr,cstr,minlen);
594 if (cmp == 0) return vlen-clen;
595 return cmp;
596 }
597
598 unsigned int zzlLength(unsigned char *zl) {
599 return ziplistLen(zl)/2;
600 }
601
602 /* Move to next entry based on the values in eptr and sptr. Both are set to
603 * NULL when there is no next entry. */
604 void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
605 unsigned char *_eptr, *_sptr;
606 redisAssert(*eptr != NULL && *sptr != NULL);
607
608 _eptr = ziplistNext(zl,*sptr);
609 if (_eptr != NULL) {
610 _sptr = ziplistNext(zl,_eptr);
611 redisAssert(_sptr != NULL);
612 } else {
613 /* No next entry. */
614 _sptr = NULL;
615 }
616
617 *eptr = _eptr;
618 *sptr = _sptr;
619 }
620
621 /* Move to the previous entry based on the values in eptr and sptr. Both are
622 * set to NULL when there is no next entry. */
623 void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
624 unsigned char *_eptr, *_sptr;
625 redisAssert(*eptr != NULL && *sptr != NULL);
626
627 _sptr = ziplistPrev(zl,*eptr);
628 if (_sptr != NULL) {
629 _eptr = ziplistPrev(zl,_sptr);
630 redisAssert(_eptr != NULL);
631 } else {
632 /* No previous entry. */
633 _eptr = NULL;
634 }
635
636 *eptr = _eptr;
637 *sptr = _sptr;
638 }
639
640 /* Returns if there is a part of the zset is in range. Should only be used
641 * internally by zzlFirstInRange and zzlLastInRange. */
642 int zzlIsInRange(unsigned char *zl, zrangespec *range) {
643 unsigned char *p;
644 double score;
645
646 /* Test for ranges that will always be empty. */
647 if (range->min > range->max ||
648 (range->min == range->max && (range->minex || range->maxex)))
649 return 0;
650
651 p = ziplistIndex(zl,-1); /* Last score. */
652 if (p == NULL) return 0; /* Empty sorted set */
653 score = zzlGetScore(p);
654 if (!zslValueGteMin(score,range))
655 return 0;
656
657 p = ziplistIndex(zl,1); /* First score. */
658 redisAssert(p != NULL);
659 score = zzlGetScore(p);
660 if (!zslValueLteMax(score,range))
661 return 0;
662
663 return 1;
664 }
665
666 /* Find pointer to the first element contained in the specified range.
667 * Returns NULL when no element is contained in the range. */
668 unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec range) {
669 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
670 double score;
671
672 /* If everything is out of range, return early. */
673 if (!zzlIsInRange(zl,&range)) return NULL;
674
675 while (eptr != NULL) {
676 sptr = ziplistNext(zl,eptr);
677 redisAssert(sptr != NULL);
678
679 score = zzlGetScore(sptr);
680 if (zslValueGteMin(score,&range)) {
681 /* Check if score = min. */
710 if (zslValueGteMin(score,&range))
711 return eptr;
712 return NULL;
713 }
714
715 /* Move to previous element by moving to the score of previous element.
716 * When this returns NULL, we know there also is no element. */
717 sptr = ziplistPrev(zl,eptr);
718 if (sptr != NULL)
719 redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
720 else
721 eptr = NULL;
722 }
723
724 return NULL;
725 }
726
727 unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
728 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
729
730 ele = getDecodedObject(ele);
731 while (eptr != NULL) {
732 sptr = ziplistNext(zl,eptr);
733 redisAssertWithInfo(NULL,ele,sptr != NULL);
734
735 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
736 /* Matching element, pull out score. */
737 if (score != NULL) *score = zzlGetScore(sptr);
738 decrRefCount(ele);
739 return eptr;
740 }
741
742 /* Move to next element. */
743 eptr = ziplistNext(zl,sptr);
744 }
745
746 decrRefCount(ele);
747 return NULL;
748 }
749
750 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
751 * don't want to modify the one given as argument. */
752 unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
753 unsigned char *p = eptr;
754
755 /* TODO: add function to ziplist API to delete N elements from offset. */
756 zl = ziplistDelete(zl,&p);
757 zl = ziplistDelete(zl,&p);
758 return zl;
759 }
760
761 unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
762 unsigned char *sptr;
763 char scorebuf[128];
764 int scorelen;
765 size_t offset;
766
767 redisAssertWithInfo(NULL,ele,ele->encoding == REDIS_ENCODING_RAW);
768 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
769 if (eptr == NULL) {
770 zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
771 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
772 } else {
773 /* Keep offset relative to zl, as it might be re-allocated. */
774 offset = eptr-zl;
775 zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
776 eptr = zl+offset;
777
778 /* Insert score after the element. */
779 redisAssertWithInfo(NULL,ele,(sptr = ziplistNext(zl,eptr)) != NULL);
780 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
781 }
782
783 return zl;
784 }
785
786 /* Insert (element,score) pair in ziplist. This function assumes the element is
787 * not yet present in the list. */
788 unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
789 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
790 double s;
791
792 ele = getDecodedObject(ele);
793 while (eptr != NULL) {
794 sptr = ziplistNext(zl,eptr);
795 redisAssertWithInfo(NULL,ele,sptr != NULL);
796 s = zzlGetScore(sptr);
797
798 if (s > score) {
799 /* First element with score larger than score for element to be
800 * inserted. This means we should take its spot in the list to
801 * maintain ordering. */
802 zl = zzlInsertAt(zl,eptr,ele,score);
803 break;
804 } else if (s == score) {
805 /* Ensure lexicographical ordering for elements. */
806 if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
807 zl = zzlInsertAt(zl,eptr,ele,score);
808 break;
809 }
810 }
811
812 /* Move to next element. */
813 eptr = ziplistNext(zl,sptr);
814 }
815
816 /* Push on tail of list when it was not yet inserted. */
817 if (eptr == NULL)
818 zl = zzlInsertAt(zl,NULL,ele,score);
819
820 decrRefCount(ele);
821 return zl;
822 }
823
824 unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec range, unsigned long *deleted) {
825 unsigned char *eptr, *sptr;
826 double score;
827 unsigned long num = 0;
828
829 if (deleted != NULL) *deleted = 0;
830
831 eptr = zzlFirstInRange(zl,range);
832 if (eptr == NULL) return zl;
833
834 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
835 * byte and ziplistNext will return NULL. */
836 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
837 score = zzlGetScore(sptr);
838 if (zslValueLteMax(score,&range)) {
839 /* Delete both the element and the score. */
840 zl = ziplistDelete(zl,&eptr);
841 zl = ziplistDelete(zl,&eptr);
842 num++;
843 } else {
844 /* No longer in range. */
845 break;
846 }
847 }
848
849 if (deleted != NULL) *deleted = num;
850 return zl;
851 }
852
853 /* Delete all the elements with rank between start and end from the skiplist.
854 * Start and end are inclusive. Note that start and end need to be 1-based */
855 unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
856 unsigned int num = (end-start)+1;
857 if (deleted) *deleted = num;
858 zl = ziplistDeleteRange(zl,2*(start-1),2*num);
859 return zl;
860 }
861
862 /*-----------------------------------------------------------------------------
863 * Common sorted set API
864 *----------------------------------------------------------------------------*/
865
866 unsigned int zsetLength(robj *zobj) {
867 int length = -1;
868 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
869 length = zzlLength(zobj->ptr);
870 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
871 length = ((zset*)zobj->ptr)->zsl->length;
872 } else {
873 redisPanic("Unknown sorted set encoding");
874 }
875 return length;
876 }
877
878 void zsetConvert(robj *zobj, int encoding) {
879 zset *zs;
880 zskiplistNode *node, *next;
881 robj *ele;
882 double score;
883
884 if (zobj->encoding == encoding) return;
885 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
886 unsigned char *zl = zobj->ptr;
887 unsigned char *eptr, *sptr;
888 unsigned char *vstr;
889 unsigned int vlen;
890 long long vlong;
891
892 if (encoding != REDIS_ENCODING_SKIPLIST)
893 redisPanic("Unknown target encoding");
894
895 zs = zmalloc(sizeof(*zs));
896 zs->dict = dictCreate(&zsetDictType,NULL);
897 zs->zsl = zslCreate();
898
899 eptr = ziplistIndex(zl,0);
900 redisAssertWithInfo(NULL,zobj,eptr != NULL);
901 sptr = ziplistNext(zl,eptr);
902 redisAssertWithInfo(NULL,zobj,sptr != NULL);
903
904 while (eptr != NULL) {
905 score = zzlGetScore(sptr);
906 redisAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
907 if (vstr == NULL)
908 ele = createStringObjectFromLongLong(vlong);
909 else
910 ele = createStringObject((char*)vstr,vlen);
911
912 /* Has incremented refcount since it was just created. */
913 node = zslInsert(zs->zsl,score,ele);
914 redisAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
915 incrRefCount(ele); /* Added to dictionary. */
916 zzlNext(zl,&eptr,&sptr);
917 }
918
919 zfree(zobj->ptr);
920 zobj->ptr = zs;
921 zobj->encoding = REDIS_ENCODING_SKIPLIST;
922 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
923 unsigned char *zl = ziplistNew();
924
925 if (encoding != REDIS_ENCODING_ZIPLIST)
926 redisPanic("Unknown target encoding");
927
928 /* Approach similar to zslFree(), since we want to free the skiplist at
929 * the same time as creating the ziplist. */
930 zs = zobj->ptr;
931 dictRelease(zs->dict);
932 node = zs->zsl->header->level[0].forward;
933 zfree(zs->zsl->header);
934 zfree(zs->zsl);
935
936 while (node) {
937 ele = getDecodedObject(node->obj);
938 zl = zzlInsertAt(zl,NULL,ele,node->score);
939 decrRefCount(ele);
940
941 next = node->level[0].forward;
942 zslFreeNode(node);
943 node = next;
944 }
945
946 zfree(zs);
947 zobj->ptr = zl;
948 zobj->encoding = REDIS_ENCODING_ZIPLIST;
949 } else {
950 redisPanic("Unknown sorted set encoding");
951 }
952 }
953
954 /*-----------------------------------------------------------------------------
955 * Sorted set commands
956 *----------------------------------------------------------------------------*/
957
958 /* This generic command implements both ZADD and ZINCRBY. */
959 void zaddGenericCommand(redisClient *c, int incr) {
960 static char *nanerr = "resulting score is not a number (NaN)";
961 robj *key = c->argv[1];
962 robj *ele;
963 robj *zobj;
964 robj *curobj;
965 double score = 0, *scores, curscore = 0.0;
966 int j, elements = (c->argc-2)/2;
967 int added = 0;
968
969 if (c->argc % 2) {
970 addReply(c,shared.syntaxerr);
971 return;
972 }
973
974 /* Start parsing all the scores, we need to emit any syntax error
975 * before executing additions to the sorted set, as the command should
976 * either execute fully or nothing at all. */
977 scores = zmalloc(sizeof(double)*elements);
978 for (j = 0; j < elements; j++) {
979 if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL)
980 != REDIS_OK)
981 {
982 zfree(scores);
983 return;
984 }
985 }
986
987 /* Lookup the key and create the sorted set if does not exist. */
988 zobj = lookupKeyWrite(c->db,key);
989 if (zobj == NULL) {
990 if (server.zset_max_ziplist_entries == 0 ||
991 server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
992 {
993 zobj = createZsetObject();
994 } else {
995 zobj = createZsetZiplistObject();
996 }
997 dbAdd(c->db,key,zobj);
998 } else {
999 if (zobj->type != REDIS_ZSET) {
1000 addReply(c,shared.wrongtypeerr);
1001 zfree(scores);
1002 return;
1003 }
1004 }
1005
1006 for (j = 0; j < elements; j++) {
1007 score = scores[j];
1008
1009 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1010 unsigned char *eptr;
1011
1012 /* Prefer non-encoded element when dealing with ziplists. */
1013 ele = c->argv[3+j*2];
1014 if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
1015 if (incr) {
1016 score += curscore;
1017 if (isnan(score)) {
1018 addReplyError(c,nanerr);
1019 /* Don't need to check if the sorted set is empty
1020 * because we know it has at least one element. */
1021 zfree(scores);
1022 return;
1023 }
1024 }
1025
1026 /* Remove and re-insert when score changed. */
1027 if (score != curscore) {
1028 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1029 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1030
1031 signalModifiedKey(c->db,key);
1032 server.dirty++;
1033 }
1034 } else {
1035 /* Optimize: check if the element is too large or the list
1036 * becomes too long *before* executing zzlInsert. */
1037 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1038 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
1039 zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
1040 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
1041 zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
1042
1043 signalModifiedKey(c->db,key);
1044 server.dirty++;
1045 if (!incr) added++;
1046 }
1047 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1048 zset *zs = zobj->ptr;
1049 zskiplistNode *znode;
1050 dictEntry *de;
1051
1052 ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]);
1053 de = dictFind(zs->dict,ele);
1054 if (de != NULL) {
1055 curobj = dictGetKey(de);
1056 curscore = *(double*)dictGetVal(de);
1057
1058 if (incr) {
1059 score += curscore;
1060 if (isnan(score)) {
1061 addReplyError(c,nanerr);
1062 /* Don't need to check if the sorted set is empty
1063 * because we know it has at least one element. */
1064 zfree(scores);
1065 return;
1066 }
1067 }
1068
1069 /* Remove and re-insert when score changed. We can safely
1070 * delete the key object from the skiplist, since the
1071 * dictionary still has a reference to it. */
1072 if (score != curscore) {
1073 redisAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj));
1074 znode = zslInsert(zs->zsl,score,curobj);
1075 incrRefCount(curobj); /* Re-inserted in skiplist. */
1076 dictGetVal(de) = &znode->score; /* Update score ptr. */
1077
1078 signalModifiedKey(c->db,key);
1079 server.dirty++;
1080 }
1081 } else {
1082 znode = zslInsert(zs->zsl,score,ele);
1083 incrRefCount(ele); /* Inserted in skiplist. */
1084 redisAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
1085 incrRefCount(ele); /* Added to dictionary. */
1086
1087 signalModifiedKey(c->db,key);
1088 server.dirty++;
1089 if (!incr) added++;
1090 }
1091 } else {
1092 redisPanic("Unknown sorted set encoding");
1093 }
1094 }
1095 zfree(scores);
1096 if (incr) /* ZINCRBY */
1097 addReplyDouble(c,score);
1098 else /* ZADD */
1099 addReplyLongLong(c,added);
1100 }
1101
1102 void zaddCommand(redisClient *c) {
1103 zaddGenericCommand(c,0);
1104 }
1105
1106 void zincrbyCommand(redisClient *c) {
1107 zaddGenericCommand(c,1);
1108 }
1109
1110 void zremCommand(redisClient *c) {
1111 robj *key = c->argv[1];
1112 robj *zobj;
1113 int deleted = 0, j;
1114
1115 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1116 checkType(c,zobj,REDIS_ZSET)) return;
1117
1118 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1119 unsigned char *eptr;
1120
1121 for (j = 2; j < c->argc; j++) {
1122 if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
1123 deleted++;
1124 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1125 if (zzlLength(zobj->ptr) == 0) {
1126 dbDelete(c->db,key);
1127 break;
1128 }
1129 }
1130 }
1131 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1132 zset *zs = zobj->ptr;
1133 dictEntry *de;
1134 double score;
1135
1136 for (j = 2; j < c->argc; j++) {
1137 de = dictFind(zs->dict,c->argv[j]);
1138 if (de != NULL) {
1139 deleted++;
1140
1141 /* Delete from the skiplist */
1142 score = *(double*)dictGetVal(de);
1143 redisAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j]));
1144
1145 /* Delete from the hash table */
1146 dictDelete(zs->dict,c->argv[j]);
1147 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1148 if (dictSize(zs->dict) == 0) {
1149 dbDelete(c->db,key);
1150 break;
1151 }
1152 }
1153 }
1154 } else {
1155 redisPanic("Unknown sorted set encoding");
1156 }
1157
1158 if (deleted) {
1159 signalModifiedKey(c->db,key);
1160 server.dirty += deleted;
1161 }
1162 addReplyLongLong(c,deleted);
1163 }
1164
1165 void zremrangebyscoreCommand(redisClient *c) {
1166 robj *key = c->argv[1];
1167 robj *zobj;
1168 zrangespec range;
1169 unsigned long deleted;
1170
1171 /* Parse the range arguments. */
1172 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
1173 addReplyError(c,"min or max is not a float");
1174 return;
1175 }
1176
1177 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1178 checkType(c,zobj,REDIS_ZSET)) return;
1179
1180 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1181 zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,range,&deleted);
1182 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
1183 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1184 zset *zs = zobj->ptr;
1185 deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
1186 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1187 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1188 } else {
1189 redisPanic("Unknown sorted set encoding");
1190 }
1191
1192 if (deleted) signalModifiedKey(c->db,key);
1193 server.dirty += deleted;
1194 addReplyLongLong(c,deleted);
1195 }
1196
1197 void zremrangebyrankCommand(redisClient *c) {
1198 robj *key = c->argv[1];
1199 robj *zobj;
1200 long start;
1201 long end;
1202 int llen;
1203 unsigned long deleted;
1204
1205 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1206 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1207
1208 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1209 checkType(c,zobj,REDIS_ZSET)) return;
1210
1211 /* Sanitize indexes. */
1212 llen = zsetLength(zobj);
1213 if (start < 0) start = llen+start;
1214 if (end < 0) end = llen+end;
1215 if (start < 0) start = 0;
1216
1217 /* Invariant: start >= 0, so this test will be true when end < 0.
1218 * The range is empty when start > end or start >= length. */
1219 if (start > end || start >= llen) {
1220 addReply(c,shared.czero);
1221 return;
1222 }
1223 if (end >= llen) end = llen-1;
1224
1225 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1226 /* Correct for 1-based rank. */
1227 zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
1228 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
1229 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1230 zset *zs = zobj->ptr;
1231
1232 /* Correct for 1-based rank. */
1233 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1234 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1235 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1236 } else {
1237 redisPanic("Unknown sorted set encoding");
1238 }
1239
1240 if (deleted) signalModifiedKey(c->db,key);
1241 server.dirty += deleted;
1242 addReplyLongLong(c,deleted);
1243 }
1244
1245 typedef struct {
1246 robj *subject;
1247 int type; /* Set, sorted set */
1248 int encoding;
1249 double weight;
1250
1251 union {
1252 /* Set iterators. */
1253 union _iterset {
1254 struct {
1255 intset *is;
1256 int ii;
1257 } is;
1258 struct {
1259 dict *dict;
1260 dictIterator *di;
1261 dictEntry *de;
1262 } ht;
1263 } set;
1264
1265 /* Sorted set iterators. */
1266 union _iterzset {
1267 struct {
1268 unsigned char *zl;
1269 unsigned char *eptr, *sptr;
1270 } zl;
1271 struct {
1272 zset *zs;
1273 zskiplistNode *node;
1274 } sl;
1275 } zset;
1276 } iter;
1277 } zsetopsrc;
1278
1279
1280 /* Use dirty flags for pointers that need to be cleaned up in the next
1281 * iteration over the zsetopval. The dirty flag for the long long value is
1282 * special, since long long values don't need cleanup. Instead, it means that
1283 * we already checked that "ell" holds a long long, or tried to convert another
1284 * representation into a long long value. When this was successful,
1285 * OPVAL_VALID_LL is set as well. */
1286 #define OPVAL_DIRTY_ROBJ 1
1287 #define OPVAL_DIRTY_LL 2
1288 #define OPVAL_VALID_LL 4
1289
1290 /* Store value retrieved from the iterator. */
1291 typedef struct {
1292 int flags;
1293 unsigned char _buf[32]; /* Private buffer. */
1294 robj *ele;
1295 unsigned char *estr;
1296 unsigned int elen;
1297 long long ell;
1298 double score;
1299 } zsetopval;
1300
1301 typedef union _iterset iterset;
1302 typedef union _iterzset iterzset;
1303
1304 void zuiInitIterator(zsetopsrc *op) {
1305 if (op->subject == NULL)
1306 return;
1307
1308 if (op->type == REDIS_SET) {
1309 iterset *it = &op->iter.set;
1310 if (op->encoding == REDIS_ENCODING_INTSET) {
1311 it->is.is = op->subject->ptr;
1312 it->is.ii = 0;
1313 } else if (op->encoding == REDIS_ENCODING_HT) {
1314 it->ht.dict = op->subject->ptr;
1315 it->ht.di = dictGetIterator(op->subject->ptr);
1316 it->ht.de = dictNext(it->ht.di);
1317 } else {
1318 redisPanic("Unknown set encoding");
1319 }
1320 } else if (op->type == REDIS_ZSET) {
1321 iterzset *it = &op->iter.zset;
1322 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1323 it->zl.zl = op->subject->ptr;
1324 it->zl.eptr = ziplistIndex(it->zl.zl,0);
1325 if (it->zl.eptr != NULL) {
1326 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
1327 redisAssert(it->zl.sptr != NULL);
1328 }
1329 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1330 it->sl.zs = op->subject->ptr;
1331 it->sl.node = it->sl.zs->zsl->header->level[0].forward;
1332 } else {
1333 redisPanic("Unknown sorted set encoding");
1334 }
1335 } else {
1336 redisPanic("Unsupported type");
1337 }
1338 }
1339
1340 void zuiClearIterator(zsetopsrc *op) {
1341 if (op->subject == NULL)
1342 return;
1343
1344 if (op->type == REDIS_SET) {
1345 iterset *it = &op->iter.set;
1346 if (op->encoding == REDIS_ENCODING_INTSET) {
1347 REDIS_NOTUSED(it); /* skip */
1348 } else if (op->encoding == REDIS_ENCODING_HT) {
1349 dictReleaseIterator(it->ht.di);
1350 } else {
1351 redisPanic("Unknown set encoding");
1352 }
1353 } else if (op->type == REDIS_ZSET) {
1354 iterzset *it = &op->iter.zset;
1355 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1356 REDIS_NOTUSED(it); /* skip */
1357 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1358 REDIS_NOTUSED(it); /* skip */
1359 } else {
1360 redisPanic("Unknown sorted set encoding");
1361 }
1362 } else {
1363 redisPanic("Unsupported type");
1364 }
1365 }
1366
1367 int zuiLength(zsetopsrc *op) {
1368 if (op->subject == NULL)
1369 return 0;
1370
1371 if (op->type == REDIS_SET) {
1372 iterset *it = &op->iter.set;
1373 if (op->encoding == REDIS_ENCODING_INTSET) {
1374 return intsetLen(it->is.is);
1375 } else if (op->encoding == REDIS_ENCODING_HT) {
1376 return dictSize(it->ht.dict);
1377 } else {
1378 redisPanic("Unknown set encoding");
1379 }
1380 } else if (op->type == REDIS_ZSET) {
1381 iterzset *it = &op->iter.zset;
1382 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1383 return zzlLength(it->zl.zl);
1384 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1385 return it->sl.zs->zsl->length;
1386 } else {
1387 redisPanic("Unknown sorted set encoding");
1388 }
1389 } else {
1390 redisPanic("Unsupported type");
1391 }
1392 }
1393
1394 /* Check if the current value is valid. If so, store it in the passed structure
1395 * and move to the next element. If not valid, this means we have reached the
1396 * end of the structure and can abort. */
1397 int zuiNext(zsetopsrc *op, zsetopval *val) {
1398 if (op->subject == NULL)
1399 return 0;
1400
1401 if (val->flags & OPVAL_DIRTY_ROBJ)
1402 decrRefCount(val->ele);
1403
1404 memset(val,0,sizeof(zsetopval));
1405
1406 if (op->type == REDIS_SET) {
1407 iterset *it = &op->iter.set;
1408 if (op->encoding == REDIS_ENCODING_INTSET) {
1409 int64_t ell;
1410
1411 if (!intsetGet(it->is.is,it->is.ii,&ell))
1412 return 0;
1413 val->ell = ell;
1414 val->score = 1.0;
1415
1416 /* Move to next element. */
1417 it->is.ii++;
1418 } else if (op->encoding == REDIS_ENCODING_HT) {
1419 if (it->ht.de == NULL)
1420 return 0;
1421 val->ele = dictGetKey(it->ht.de);
1422 val->score = 1.0;
1423
1424 /* Move to next element. */
1425 it->ht.de = dictNext(it->ht.di);
1426 } else {
1427 redisPanic("Unknown set encoding");
1428 }
1429 } else if (op->type == REDIS_ZSET) {
1430 iterzset *it = &op->iter.zset;
1431 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1432 /* No need to check both, but better be explicit. */
1433 if (it->zl.eptr == NULL || it->zl.sptr == NULL)
1434 return 0;
1435 redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
1436 val->score = zzlGetScore(it->zl.sptr);
1437
1438 /* Move to next element. */
1439 zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
1440 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1441 if (it->sl.node == NULL)
1442 return 0;
1443 val->ele = it->sl.node->obj;
1444 val->score = it->sl.node->score;
1445
1446 /* Move to next element. */
1447 it->sl.node = it->sl.node->level[0].forward;
1448 } else {
1449 redisPanic("Unknown sorted set encoding");
1450 }
1451 } else {
1452 redisPanic("Unsupported type");
1453 }
1454 return 1;
1455 }
1456
1457 int zuiLongLongFromValue(zsetopval *val) {
1458 if (!(val->flags & OPVAL_DIRTY_LL)) {
1459 val->flags |= OPVAL_DIRTY_LL;
1460
1461 if (val->ele != NULL) {
1462 if (val->ele->encoding == REDIS_ENCODING_INT) {
1463 val->ell = (long)val->ele->ptr;
1464 val->flags |= OPVAL_VALID_LL;
1465 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1466 if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
1467 val->flags |= OPVAL_VALID_LL;
1468 } else {
1469 redisPanic("Unsupported element encoding");
1470 }
1471 } else if (val->estr != NULL) {
1472 if (string2ll((char*)val->estr,val->elen,&val->ell))
1473 val->flags |= OPVAL_VALID_LL;
1474 } else {
1475 /* The long long was already set, flag as valid. */
1476 val->flags |= OPVAL_VALID_LL;
1477 }
1478 }
1479 return val->flags & OPVAL_VALID_LL;
1480 }
1481
1482 robj *zuiObjectFromValue(zsetopval *val) {
1483 if (val->ele == NULL) {
1484 if (val->estr != NULL) {
1485 val->ele = createStringObject((char*)val->estr,val->elen);
1486 } else {
1487 val->ele = createStringObjectFromLongLong(val->ell);
1488 }
1489 val->flags |= OPVAL_DIRTY_ROBJ;
1490 }
1491 return val->ele;
1492 }
1493
1494 int zuiBufferFromValue(zsetopval *val) {
1495 if (val->estr == NULL) {
1496 if (val->ele != NULL) {
1497 if (val->ele->encoding == REDIS_ENCODING_INT) {
1498 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
1499 val->estr = val->_buf;
1500 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1501 val->elen = sdslen(val->ele->ptr);
1502 val->estr = val->ele->ptr;
1503 } else {
1504 redisPanic("Unsupported element encoding");
1505 }
1506 } else {
1507 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
1508 val->estr = val->_buf;
1509 }
1510 }
1511 return 1;
1512 }
1513
1514 /* Find value pointed to by val in the source pointer to by op. When found,
1515 * return 1 and store its score in target. Return 0 otherwise. */
1516 int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
1517 if (op->subject == NULL)
1518 return 0;
1519
1520 if (op->type == REDIS_SET) {
1521 iterset *it = &op->iter.set;
1522
1523 if (op->encoding == REDIS_ENCODING_INTSET) {
1524 if (zuiLongLongFromValue(val) && intsetFind(it->is.is,val->ell)) {
1525 *score = 1.0;
1526 return 1;
1527 } else {
1528 return 0;
1529 }
1530 } else if (op->encoding == REDIS_ENCODING_HT) {
1531 zuiObjectFromValue(val);
1532 if (dictFind(it->ht.dict,val->ele) != NULL) {
1533 *score = 1.0;
1534 return 1;
1535 } else {
1536 return 0;
1537 }
1538 } else {
1539 redisPanic("Unknown set encoding");
1540 }
1541 } else if (op->type == REDIS_ZSET) {
1542 iterzset *it = &op->iter.zset;
1543 zuiObjectFromValue(val);
1544
1545 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1546 if (zzlFind(it->zl.zl,val->ele,score) != NULL) {
1547 /* Score is already set by zzlFind. */
1548 return 1;
1549 } else {
1550 return 0;
1551 }
1552 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1553 dictEntry *de;
1554 if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) {
1555 *score = *(double*)dictGetVal(de);
1556 return 1;
1557 } else {
1558 return 0;
1559 }
1560 } else {
1561 redisPanic("Unknown sorted set encoding");
1562 }
1563 } else {
1564 redisPanic("Unsupported type");
1565 }
1566 }
1567
1568 int zuiCompareByCardinality(const void *s1, const void *s2) {
1569 return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
1570 }
1571
1572 #define REDIS_AGGR_SUM 1
1573 #define REDIS_AGGR_MIN 2
1574 #define REDIS_AGGR_MAX 3
1575 #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
1576
1577 inline static void zunionInterAggregate(double *target, double val, int aggregate) {
1578 if (aggregate == REDIS_AGGR_SUM) {
1579 *target = *target + val;
1580 /* The result of adding two doubles is NaN when one variable
1581 * is +inf and the other is -inf. When these numbers are added,
1582 * we maintain the convention of the result being 0.0. */
1583 if (isnan(*target)) *target = 0.0;
1584 } else if (aggregate == REDIS_AGGR_MIN) {
1585 *target = val < *target ? val : *target;
1586 } else if (aggregate == REDIS_AGGR_MAX) {
1587 *target = val > *target ? val : *target;
1588 } else {
1589 /* safety net */
1590 redisPanic("Unknown ZUNION/INTER aggregate type");
1591 }
1592 }
1593
1594 void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
1595 int i, j;
1596 long setnum;
1597 int aggregate = REDIS_AGGR_SUM;
1598 zsetopsrc *src;
1599 zsetopval zval;
1600 robj *tmp;
1601 unsigned int maxelelen = 0;
1602 robj *dstobj;
1603 zset *dstzset;
1604 zskiplistNode *znode;
1605 int touched = 0;
1606
1607 /* expect setnum input keys to be given */
1608 if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != REDIS_OK))
1609 return;
1610
1611 if (setnum < 1) {
1612 addReplyError(c,
1613 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
1614 return;
1615 }
1616
1617 /* test if the expected number of keys would overflow */
1618 if (setnum > c->argc-3) {
1619 addReply(c,shared.syntaxerr);
1620 return;
1621 }
1622
1623 /* read keys to be used for input */
1624 src = zcalloc(sizeof(zsetopsrc) * setnum);
1625 for (i = 0, j = 3; i < setnum; i++, j++) {
1626 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
1627 if (obj != NULL) {
1628 if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
1629 zfree(src);
1630 addReply(c,shared.wrongtypeerr);
1631 return;
1632 }
1633
1634 src.subject = obj;
1635 src.type = obj->type;
1636 src.encoding = obj->encoding;
1637 } else {
1638 src.subject = NULL;
1639 }
1640
1641 /* Default all weights to 1. */
1642 src.weight = 1.0;
1643 }
1644
1645 /* parse optional extra arguments */
1646 if (j < c->argc) {
1647 int remaining = c->argc - j;
1648
1649 while (remaining) {
1650 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1651 j++; remaining--;
1652 for (i = 0; i < setnum; i++, j++, remaining--) {
1653 if (getDoubleFromObjectOrReply(c,c->argv[j],&src.weight,
1654 "weight value is not a float") != REDIS_OK)
1655 {
1656 zfree(src);
1657 return;
1658 }
1659 }
1660 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
1661 j++; remaining--;
1662 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
1663 aggregate = REDIS_AGGR_SUM;
1664 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
1665 aggregate = REDIS_AGGR_MIN;
1666 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
1667 aggregate = REDIS_AGGR_MAX;
1668 } else {
1669 zfree(src);
1670 addReply(c,shared.syntaxerr);
1671 return;
1672 }
1673 j++; remaining--;
1674 } else {
1675 zfree(src);
1676 addReply(c,shared.syntaxerr);
1677 return;
1678 }
1679 }
1680 }
1681
1682 for (i = 0; i < setnum; i++)
1683 zuiInitIterator(&src);
1684
1685 /* sort sets from the smallest to largest, this will improve our
1686 * algorithm's performance */
1687 qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
1688
1689 dstobj = createZsetObject();
1690 dstzset = dstobj->ptr;
1691 memset(&zval, 0, sizeof(zval));
1692
1693 if (op == REDIS_OP_INTER) {
1694 /* Skip everything if the smallest input is empty. */
1695 if (zuiLength(&src[0]) > 0) {
1696 /* Precondition: as src[0] is non-empty and the inputs are ordered
1697 * by size, all src[i > 0] are non-empty too. */
1698 while (zuiNext(&src[0],&zval)) {
1699 double score, value;
1700
1701 score = src[0].weight * zval.score;
1702 if (isnan(score)) score = 0;
1703
1704 for (j = 1; j < setnum; j++) {
1705 /* It is not safe to access the zset we are
1706 * iterating, so explicitly check for equal object. */
1707 if (src[j].subject == src[0].subject) {
1708 value = zval.score*src[j].weight;
1709 zunionInterAggregate(&score,value,aggregate);
1710 } else if (zuiFind(&src[j],&zval,&value)) {
1711 value *= src[j].weight;
1712 zunionInterAggregate(&score,value,aggregate);
1713 } else {
1714 break;
1715 }
1716 }
1717
1718 /* Only continue when present in every input. */
1719 if (j == setnum) {
1720 tmp = zuiObjectFromValue(&zval);
1721 znode = zslInsert(dstzset->zsl,score,tmp);
1722 incrRefCount(tmp); /* added to skiplist */
1723 dictAdd(dstzset->dict,tmp,&znode->score);
1724 incrRefCount(tmp); /* added to dictionary */
1725
1726 if (tmp->encoding == REDIS_ENCODING_RAW)
1727 if (sdslen(tmp->ptr) > maxelelen)
1728 maxelelen = sdslen(tmp->ptr);
1729 }
1730 }
1731 }
1732 } else if (op == REDIS_OP_UNION) {
1733 for (i = 0; i < setnum; i++) {
1734 if (zuiLength(&src) == 0)
1735 continue;
1736
1737 while (zuiNext(&src,&zval)) {
1738 double score, value;
1739
1740 /* Skip key when already processed */
1741 if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
1742 continue;
1743
1744 /* Initialize score */
1745 score = src.weight * zval.score;
1746 if (isnan(score)) score = 0;
1747
1748 /* Because the inputs are sorted by size, it's only possible
1749 * for sets at larger indices to hold this element. */
1750 for (j = (i+1); j < setnum; j++) {
1751 /* It is not safe to access the zset we are
1752 * iterating, so explicitly check for equal object. */
1753 if(src[j].subject == src.subject) {
1754 value = zval.score*src[j].weight;
1755 zunionInterAggregate(&score,value,aggregate);
1756 } else if (zuiFind(&src[j],&zval,&value)) {
1757 value *= src[j].weight;
1758 zunionInterAggregate(&score,value,aggregate);
1759 }
1760 }
1761
1762 tmp = zuiObjectFromValue(&zval);
1763 znode = zslInsert(dstzset->zsl,score,tmp);
1764 incrRefCount(zval.ele); /* added to skiplist */
1765 dictAdd(dstzset->dict,tmp,&znode->score);
1766 incrRefCount(zval.ele); /* added to dictionary */
1767
1768 if (tmp->encoding == REDIS_ENCODING_RAW)
1769 if (sdslen(tmp->ptr) > maxelelen)
1770 maxelelen = sdslen(tmp->ptr);
1771 }
1772 }
1773 } else {
1774 redisPanic("Unknown operator");
1775 }
1776
1777 for (i = 0; i < setnum; i++)
1778 zuiClearIterator(&src);
1779
1780 if (dbDelete(c->db,dstkey)) {
1781 signalModifiedKey(c->db,dstkey);
1782 touched = 1;
1783 server.dirty++;
1784 }
1785 if (dstzset->zsl->length) {
1786 /* Convert to ziplist when in limits. */
1787 if (dstzset->zsl->length db,dstkey);
1794 server.dirty++;
1795 } else {
1796 decrRefCount(dstobj);
1797 addReply(c,shared.czero);
1798 }
1799 zfree(src);
1800 }
1801
1802 void zunionstoreCommand(redisClient *c) {
1803 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
1804 }
1805
1806 void zinterstoreCommand(redisClient *c) {
1807 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
1808 }
1809
1810 void zrangeGenericCommand(redisClient *c, int reverse) {
1811 robj *key = c->argv[1];
1812 robj *zobj;
1813 int withscores = 0;
1814 long start;
1815 long end;
1816 int llen;
1817 int rangelen;
1818
1819 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1820 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1821
1822 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
1823 withscores = 1;
1824 } else if (c->argc >= 5) {
1825 addReply(c,shared.syntaxerr);
1826 return;
1827 }
1828
1829 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
1830 || checkType(c,zobj,REDIS_ZSET)) return;
1831
1832 /* Sanitize indexes. */
1833 llen = zsetLength(zobj);
1834 if (start < 0) start = llen+start;
1835 if (end < 0) end = llen+end;
1836 if (start < 0) start = 0;
1837
1838 /* Invariant: start >= 0, so this test will be true when end < 0.
1839 * The range is empty when start > end or start >= length. */
1840 if (start > end || start >= llen) {
1841 addReply(c,shared.emptymultibulk);
1842 return;
1843 }
1844 if (end >= llen) end = llen-1;
1845 rangelen = (end-start)+1;
1846
1847 /* Return the result in form of a multi-bulk reply */
1848 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
1849
1850 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1851 unsigned char *zl = zobj->ptr;
1852 unsigned char *eptr, *sptr;
1853 unsigned char *vstr;
1854 unsigned int vlen;
1855 long long vlong;
1856
1857 if (reverse)
1858 eptr = ziplistIndex(zl,-2-(2*start));
1859 else
1860 eptr = ziplistIndex(zl,2*start);
1861
1862 redisAssertWithInfo(c,zobj,eptr != NULL);
1863 sptr = ziplistNext(zl,eptr);
1864
1865 while (rangelen--) {
1866 redisAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
1867 redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
1868 if (vstr == NULL)
1869 addReplyBulkLongLong(c,vlong);
1870 else
1871 addReplyBulkCBuffer(c,vstr,vlen);
1872
1873 if (withscores)
1874 addReplyDouble(c,zzlGetScore(sptr));
1875
1876 if (reverse)
1877 zzlPrev(zl,&eptr,&sptr);
1878 else
1879 zzlNext(zl,&eptr,&sptr);
1880 }
1881
1882 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1883 zset *zs = zobj->ptr;
1884 zskiplist *zsl = zs->zsl;
1885 zskiplistNode *ln;
1886 robj *ele;
1887
1888 /* Check if starting point is trivial, before doing log(N) lookup. */
1889 if (reverse) {
1890 ln = zsl->tail;
1891 if (start > 0)
1892 ln = zslGetElementByRank(zsl,llen-start);
1893 } else {
1894 ln = zsl->header->level[0].forward;
1895 if (start > 0)
1896 ln = zslGetElementByRank(zsl,start+1);
1897 }
1898
1899 while(rangelen--) {
1900 redisAssertWithInfo(c,zobj,ln != NULL);
1901 ele = ln->obj;
1902 addReplyBulk(c,ele);
1903 if (withscores)
1904 addReplyDouble(c,ln->score);
1905 ln = reverse ? ln->backward : ln->level[0].forward;
1906 }
1907 } else {
1908 redisPanic("Unknown sorted set encoding");
1909 }
1910 }
1911
1912 void zrangeCommand(redisClient *c) {
1913 zrangeGenericCommand(c,0);
1914 }
1915
1916 void zrevrangeCommand(redisClient *c) {
1917 zrangeGenericCommand(c,1);
1918 }
1919
1920 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
1921 void genericZrangebyscoreCommand(redisClient *c, int reverse) {
1922 zrangespec range;
1923 robj *key = c->argv[1];
1924 robj *zobj;
1925 long offset = 0, limit = -1;
1926 int withscores = 0;
1927 unsigned long rangelen = 0;
1928 void *replylen = NULL;
1929 int minidx, maxidx;
1930
1931 /* Parse the range arguments. */
1932 if (reverse) {
1933 /* Range is given as [max,min] */
1934 maxidx = 2; minidx = 3;
1935 } else {
1936 /* Range is given as [min,max] */
1937 minidx = 2; maxidx = 3;
1938 }
1939
1940 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
1941 addReplyError(c,"min or max is not a float");
1942 return;
1943 }
1944
1945 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1946 * 4 arguments, so we'll never enter the following code path. */
1947 if (c->argc > 4) {
1948 int remaining = c->argc - 4;
1949 int pos = 4;
1950
1951 while (remaining) {
1952 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
1953 pos++; remaining--;
1954 withscores = 1;
1955 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
1956 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != REDIS_OK) ||
1957 (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != REDIS_OK)) return;
1958 pos += 3; remaining -= 3;
1959 } else {
1960 addReply(c,shared.syntaxerr);
1961 return;
1962 }
1963 }
1964 }
1965
1966 /* Ok, lookup the key and get the range */
1967 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
1968 checkType(c,zobj,REDIS_ZSET)) return;
1969
1970 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1971 unsigned char *zl = zobj->ptr;
1972 unsigned char *eptr, *sptr;
1973 unsigned char *vstr;
1974 unsigned int vlen;
1975 long long vlong;
1976 double score;
1977
1978 /* If reversed, get the last node in range as starting point. */
1979 if (reverse) {
1980 eptr = zzlLastInRange(zl,range);
1981 } else {
1982 eptr = zzlFirstInRange(zl,range);
1983 }
1984
1985 /* No "first" element in the specified interval. */
1986 if (eptr == NULL) {
1987 addReply(c, shared.emptymultibulk);
1988 return;
1989 }
1990
1991 /* Get score pointer for the first element. */
1992 redisAssertWithInfo(c,zobj,eptr != NULL);
1993 sptr = ziplistNext(zl,eptr);
1994
1995 /* We don't know in advance how many matching elements there are in the
1996 * list, so we push this object that will represent the multi-bulk
1997 * length in the output buffer, and will "fix" it later */
1998 replylen = addDeferredMultiBulkLength(c);
1999
2000 /* If there is an offset, just traverse the number of elements without
2001 * checking the score because that is done in the next loop. */
2002 while (eptr && offset--) {
2003 if (reverse) {
2004 zzlPrev(zl,&eptr,&sptr);
2005 } else {
2006 zzlNext(zl,&eptr,&sptr);
2007 }
2008 }
2009
2010 while (eptr && limit--) {
2011 score = zzlGetScore(sptr);
2012
2013 /* Abort when the node is no longer in range. */
2014 if (reverse) {
2015 if (!zslValueGteMin(score,&range)) break;
2016 } else {
2017 if (!zslValueLteMax(score,&range)) break;
2018 }
2019
2020 /* We know the element exists, so ziplistGet should always succeed */
2021 redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
2022
2023 rangelen++;
2024 if (vstr == NULL) {
2025 addReplyBulkLongLong(c,vlong);
2026 } else {
2027 addReplyBulkCBuffer(c,vstr,vlen);
2028 }
2029
2030 if (withscores) {
2031 addReplyDouble(c,score);
2032 }
2033
2034 /* Move to next node */
2035 if (reverse) {
2036 zzlPrev(zl,&eptr,&sptr);
2037 } else {
2038 zzlNext(zl,&eptr,&sptr);
2039 }
2040 }
2041 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2042 zset *zs = zobj->ptr;
2043 zskiplist *zsl = zs->zsl;
2044 zskiplistNode *ln;
2045
2046 /* If reversed, get the last node in range as starting point. */
2047 if (reverse) {
2048 ln = zslLastInRange(zsl,range);
2049 } else {
2050 ln = zslFirstInRange(zsl,range);
2051 }
2052
2053 /* No "first" element in the specified interval. */
2054 if (ln == NULL) {
2055 addReply(c, shared.emptymultibulk);
2056 return;
2057 }
2058
2059 /* We don't know in advance how many matching elements there are in the
2060 * list, so we push this object that will represent the multi-bulk
2061 * length in the output buffer, and will "fix" it later */
2062 replylen = addDeferredMultiBulkLength(c);
2063
2064 /* If there is an offset, just traverse the number of elements without
2065 * checking the score because that is done in the next loop. */
2066 while (ln && offset--) {
2067 if (reverse) {
2068 ln = ln->backward;
2069 } else {
2070 ln = ln->level[0].forward;
2071 }
2072 }
2073
2074 while (ln && limit--) {
2075 /* Abort when the node is no longer in range. */
2076 if (reverse) {
2077 if (!zslValueGteMin(ln->score,&range)) break;
2078 } else {
2079 if (!zslValueLteMax(ln->score,&range)) break;
2080 }
2081
2082 rangelen++;
2083 addReplyBulk(c,ln->obj);
2084
2085 if (withscores) {
2086 addReplyDouble(c,ln->score);
2087 }
2088
2089 /* Move to next node */
2090 if (reverse) {
2091 ln = ln->backward;
2092 } else {
2093 ln = ln->level[0].forward;
2094 }
2095 }
2096 } else {
2097 redisPanic("Unknown sorted set encoding");
2098 }
2099
2100 if (withscores) {
2101 rangelen *= 2;
2102 }
2103
2104 setDeferredMultiBulkLength(c, replylen, rangelen);
2105 }
2106
2107 void zrangebyscoreCommand(redisClient *c) {
2108 genericZrangebyscoreCommand(c,0);
2109 }
2110
2111 void zrevrangebyscoreCommand(redisClient *c) {
2112 genericZrangebyscoreCommand(c,1);
2113 }
2114
2115 void zcountCommand(redisClient *c) {
2116 robj *key = c->argv[1];
2117 robj *zobj;
2118 zrangespec range;
2119 int count = 0;
2120
2121 /* Parse the range arguments */
2122 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
2123 addReplyError(c,"min or max is not a float");
2124 return;
2125 }
2126
2127 /* Lookup the sorted set */
2128 if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
2129 checkType(c, zobj, REDIS_ZSET)) return;
2130
2131 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
2132 unsigned char *zl = zobj->ptr;
2133 unsigned char *eptr, *sptr;
2134 double score;
2135
2136 /* Use the first element in range as the starting point */
2137 eptr = zzlFirstInRange(zl,range);
2138
2139 /* No "first" element */
2140 if (eptr == NULL) {
2141 addReply(c, shared.czero);
2142 return;
2143 }
2144
2145 /* First element is in range */
2146 sptr = ziplistNext(zl,eptr);
2147 score = zzlGetScore(sptr);
2148 redisAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
2149
2150 /* Iterate over elements in range */
2151 while (eptr) {
2152 score = zzlGetScore(sptr);
2153
2154 /* Abort when the node is no longer in range. */
2155 if (!zslValueLteMax(score,&range)) {
2156 break;
2157 } else {
2158 count++;
2159 zzlNext(zl,&eptr,&sptr);
2160 }
2161 }
2162 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2163 zset *zs = zobj->ptr;
2164 zskiplist *zsl = zs->zsl;
2165 zskiplistNode *zn;
2166 unsigned long rank;
2167
2168 /* Find first element in range */
2169 zn = zslFirstInRange(zsl, range);
2170
2171 /* Use rank of first element, if any, to determine preliminary count */
2172 if (zn != NULL) {
2173 rank = zslGetRank(zsl, zn->score, zn->obj);
2174 count = (zsl->length - (rank - 1));
2175
2176 /* Find last element in range */
2177 zn = zslLastInRange(zsl, range);
2178
2179 /* Use rank of last element, if any, to determine the actual count */
2180 if (zn != NULL) {
2181 rank = zslGetRank(zsl, zn->score, zn->obj);
2182 count -= (zsl->length - rank);
2183 }
2184 }
2185 } else {
2186 redisPanic("Unknown sorted set encoding");
2187 }
2188
2189 addReplyLongLong(c, count);
2190 }
2191
2192 void zcardCommand(redisClient *c) {
2193 robj *key = c->argv[1];
2194 robj *zobj;
2195
2196 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
2197 checkType(c,zobj,REDIS_ZSET)) return;
2198
2199 addReplyLongLong(c,zsetLength(zobj));
2200 }
2201
2202 void zscoreCommand(redisClient *c) {
2203 robj *key = c->argv[1];
2204 robj *zobj;
2205 double score;
2206
2207 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
2208 checkType(c,zobj,REDIS_ZSET)) return;
2209
2210 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
2211 if (zzlFind(zobj->ptr,c->argv[2],&score) != NULL)
2212 addReplyDouble(c,score);
2213 else
2214 addReply(c,shared.nullbulk);
2215 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2216 zset *zs = zobj->ptr;
2217 dictEntry *de;
2218
2219 c->argv[2] = tryObjectEncoding(c->argv[2]);
2220 de = dictFind(zs->dict,c->argv[2]);
2221 if (de != NULL) {
2222 score = *(double*)dictGetVal(de);
2223 addReplyDouble(c,score);
2224 } else {
2225 addReply(c,shared.nullbulk);
2226 }
2227 } else {
2228 redisPanic("Unknown sorted set encoding");
2229 }
2230 }
2231
2232 void zrankGenericCommand(redisClient *c, int reverse) {
2233 robj *key = c->argv[1];
2234 robj *ele = c->argv[2];
2235 robj *zobj;
2236 unsigned long llen;
2237 unsigned long rank;
2238
2239 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
2240 checkType(c,zobj,REDIS_ZSET)) return;
2241 llen = zsetLength(zobj);
2242
2243 redisAssertWithInfo(c,ele,ele->encoding == REDIS_ENCODING_RAW);
2244 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
2245 unsigned char *zl = zobj->ptr;
2246 unsigned char *eptr, *sptr;
2247
2248 eptr = ziplistIndex(zl,0);
2249 redisAssertWithInfo(c,zobj,eptr != NULL);
2250 sptr = ziplistNext(zl,eptr);
2251 redisAssertWithInfo(c,zobj,sptr != NULL);
2252
2253 rank = 1;
2254 while(eptr != NULL) {
2255 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
2256 break;
2257 rank++;
2258 zzlNext(zl,&eptr,&sptr);
2259 }
2260
2261 if (eptr != NULL) {
2262 if (reverse)
2263 addReplyLongLong(c,llen-rank);
2264 else
2265 addReplyLongLong(c,rank-1);
2266 } else {
2267 addReply(c,shared.nullbulk);
2268 }
2269 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2270 zset *zs = zobj->ptr;
2271 zskiplist *zsl = zs->zsl;
2272 dictEntry *de;
2273 double score;
2274
2275 ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
2276 de = dictFind(zs->dict,ele);
2277 if (de != NULL) {
2278 score = *(double*)dictGetVal(de);
2279 rank = zslGetRank(zsl,score,ele);
2280 redisAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */
2281 if (reverse)
2282 addReplyLongLong(c,llen-rank);
2283 else
2284 addReplyLongLong(c,rank-1);
2285 } else {
2286 addReply(c,shared.nullbulk);
2287 }
2288 } else {
2289 redisPanic("Unknown sorted set encoding");
2290 }
2291 }
2292
2293 void zrankCommand(redisClient *c) {
2294 zrankGenericCommand(c, 0);
2295 }
2296
2297 void zrevrankCommand(redisClient *c) {
2298 zrankGenericCommand(c, 1);
2299 }
t_zset.c 参考文献: http://kenby.iteye.com/blog/1187303
http://blog.iyunv.com/acceptedxukai/article/details/17333673
http://www.iyunv.com/xuqiang/archive/2011/05/22/2053516.html
|
|