Redis数据类型与指令详解之集合(t_set)
集合编码方式Redis 集合(set)使用REDIS_ENCONDING_INT与REDIS_ENCONDING_HT两种编码方式
1、REDIS_ENCONDING_INT: intset.c/intset.h
2、REDIS_ENCONDING_HT: dict.c/dict.h
第一个添加到集合的元素,决定了创建集合时所使用的编码:如果第一个元素可以表示为 long long 类型值(也即是,它是一个整数),那么集合的初始编码为 REDIS_ENCODING_INTSET ;否则,集合的初始编码为 REDIS_ENCODING_HT 。
编码切换:如果一个集合使用 REDIS_ENCODING_INTSET 编码,那么当以下任何一个条件被满足时,这个集合会被转换成 REDIS_ENCODING_HT 编码:
1、 intset 保存的整数值个数超过 server.set_max_intset_entries (默认值为 512 )
2、试图往集合里添加一个新元素,并且这个元素不能被表示为 long long 类型
集合指令实现
SADD
指令格式: SADD key member
将一个或多个member元素加入到集合key当中,由于集合成员不能重复,已经存在于集合key中的member元素将被忽略。
如果key不存在,则创建一个包含被添加的member元素的新集合。
如果key不是集合类型(REDIS_SET)时,则操作出错,redis返回一个错误。
时间复杂度:O(N)
void saddCommand(redisClient *c) {
robj *set;
int j, added = 0;
set = lookupKeyWrite(c->db,c->argv);//写查找数据库中名为c->argv的集合
if (set == NULL) {//集合不存在
set = setTypeCreate(c->argv);//创建一个新的集合
dbAdd(c->db,c->argv,set);//将该集合添加到数据库中,dict中的key就是集合的名称,value就是集合元素
} else {
if (set->type != REDIS_SET) {//判断是否是集合类型
addReply(c,shared.wrongtypeerr);
return;
}
}
for (j = 2; j < c->argc; j++) {//添加集合元素
c->argv = tryObjectEncoding(c->argv);//尝试使用整型存储数据
if (setTypeAdd(set,c->argv)) added++;
}
if (added) {
signalModifiedKey(c->db,c->argv);//通知数据库哪些键key变化了,把变化的key存储到watched_keys中,只在事务操作时才用的着
notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sadd",c->argv,c->db->id);//暂不知道啥用途
}
server.dirty += added;//数据库中数据变化的数目
addReplyLongLong(c,added);
}
lookupKeyWrite函数在object.c文件中,用来在数据库中查找指定key的value值。
setTypeCreate函数在创建一个新的Redis_Set时,根据添加的元素类型为整型还是字符串会创建不同的存储数据结构
//创建一个集合对象,如果value是整型,那么使用intset,否则使用dict
robj *setTypeCreate(robj *value) {
if (isObjectRepresentableAsLongLong(value,NULL) == REDIS_OK)
return createIntsetObject();//intset
return createSetObject();//dict
}
robj *createSetObject(void) {
dict *d = dictCreate(&setDictType,NULL);
robj *o = createObject(REDIS_SET,d);
o->encoding = REDIS_ENCODING_HT;
return o;
}
robj *createIntsetObject(void) {
intset *is = intsetNew();
robj *o = createObject(REDIS_SET,is);
o->encoding = REDIS_ENCODING_INTSET;
return o;
}
robj *createObject(int type, void *ptr) {
robj *o = zmalloc(sizeof(*o));
o->type = type;
o->encoding = REDIS_ENCODING_RAW;
o->ptr = ptr;
o->refcount = 1;
/* Set the LRU to the current lruclock (minutes resolution). */
o->lru = server.lruclock;
return o;
}
SCARD
SCARD key
返回集合key中元素的个数
时间复杂度:O(1)
简单根据集合的编码类型:如果是HT编码,那么直接通过dictSize函数得到字典中元素的个数;如果是intset编码,那么直接通过intsetLen函数得到结果.
SISMEMBER
SISMEMBER key member
判断member元素是否集合key的中的元素
时间复杂度: O(1)
简单根据集合的编码类型:如果是HT编码,那么直接通过dictFind函数查找字典中是否存在该member;如果是intset编码,那么直接通过intsetFind函数查找是否存在该member.
SMEMBERS
SMEMBERS key
返回集合key中的所有元素,不存在的key被视为空集合。
时间复杂度: O(N)
SMOVE
SMOVE source destination member
将member元素从集合source转移到集合destination,如果集合source中不存在member元素,那么SMOVE指令不执行任何操作,返回0。若存在member元素,将member元素从集合source中删除,并添加到集合destination,如果集合destination中已存在member元素,那么仅仅从集合source中删除元素member。
void smoveCommand(redisClient *c) {//将member元素从source集合移动到destination集合
robj *srcset, *dstset, *ele;
srcset = lookupKeyWrite(c->db,c->argv);//从数据库中查找集合source
dstset = lookupKeyWrite(c->db,c->argv);
ele = c->argv = tryObjectEncoding(c->argv);//member元素
/* If the source key does not exist return 0 */
if (srcset == NULL) {
addReply(c,shared.czero);
return;
}
/* If the source key has the wrong type, or the destination key
* is set and has the wrong type, return with an error. */
if (checkType(c,srcset,REDIS_SET) ||
(dstset && checkType(c,dstset,REDIS_SET))) return;//类型检查
/* If srcset and dstset are equal, SMOVE is a no-op */
if (srcset == dstset) {//source与dest相同
addReply(c,shared.cone);
return;
}
/* If the element cannot be removed from the src set, return 0. */
if (!setTypeRemove(srcset,ele)) {//从源集合中删除member元素
addReply(c,shared.czero);
return;
}
notifyKeyspaceEvent(REDIS_NOTIFY_SET,"srem",c->argv,c->db->id);
/* Remove the src set from the database when empty */
if (setTypeSize(srcset) == 0) {//移除member元素后,源集合为空,删除
dbDelete(c->db,c->argv);
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv,c->db->id);
}
signalModifiedKey(c->db,c->argv);
signalModifiedKey(c->db,c->argv);
server.dirty++;
/* Create the destination set when it doesn't exist */
if (!dstset) {//目标集合不存在,则新建
dstset = setTypeCreate(ele);
dbAdd(c->db,c->argv,dstset);
}
/* An extra key has changed when ele was successfully added to dstset */
if (setTypeAdd(dstset,ele)) {//添加member元素到目标集合
server.dirty++;
notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sadd",c->argv,c->db->id);
}
addReply(c,shared.cone);
}
SPOP
SPOP key
移除并返回集合key中的一个随机元素。
时间复杂度:O(1)
由于是随机删除一个元素,那么对复制与AOF肯定有影响,因此该操作后,需要将该指令改变为SREM即将该元素删除,参考rewriteClientCommandVector函数。
SRANDMEMBER
SRANDMEMBER key
参数可选,如果没有count,那么返回集合中的一个随机元素。
如果count为正数,且小于集合元素个数,那么命令返回一个包含count个元素的数组,数组中的元素各不相同,如果count大于等于集合基数,那么返回整个集合;如果 count 为负数,那么命令返回一个数组,数组中的元素可能会重复出现多次,而数组的长度为count的绝对值。
SREM
SREM key member
移除集合key中的一个或多个member元素,不存在的member元素会被忽略。
集合求交算法
集合求交的指令有两个:SINTER与SINTERSTORE
SINTER key
返回所有给定集合的交集,不存在的 key 被视为空集,当给定集合当中有一个空集时,结果也为空集.
时间复杂度: O(N * M),N 为给定集合中元素数目最小的集合,M 为给定集合的个数
SINTERSTORE destination key
与SINTER指令类似,不同的仅仅将结果集存储到目标集合destination中,如果集合destination已存在,那么将其覆盖
void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) {
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;//迭代器
robj *eleobj, *dstset = NULL;
int64_t intobj;
void *replylen = NULL;
unsigned long j, cardinality = 0;
int encoding;
for (j = 0; j < setnum; j++) {//得到所有的集合
robj *setobj = dstkey ?
lookupKeyWrite(c->db,setkeys) :
lookupKeyRead(c->db,setkeys);
if (!setobj) {//任何一个集合不存在,那么总的交集就为空
zfree(sets);
if (dstkey) {
if (dbDelete(c->db,dstkey)) {
signalModifiedKey(c->db,dstkey);
server.dirty++;
}
addReply(c,shared.czero);
} else {
addReply(c,shared.emptymultibulk);
}
return;
}
if (checkType(c,setobj,REDIS_SET)) {//类型检查
zfree(sets);
return;
}
sets = setobj;
}
/* Sort sets from the smallest to largest, this will improve our
* algorithm's performance */
//按照集合元素个数从小到大排序
qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);
/* The first thing we should output is the total number of elements...
* since this is a multi-bulk write, but at this stage we don't know
* the intersection set size, so we use a trick, append an empty object
* to the output list and save the pointer to later modify it with the
* right length */
if (!dstkey) {
replylen = addDeferredMultiBulkLength(c);
} else {
/* If we have a target key where to store the resulting set
* create this key with an empty set inside */
dstset = createIntsetObject();
}
/* Iterate all the elements of the first (smallest) set, and test
* the element against all the other sets, if at least one set does
* not include the element it is discarded */
/**
求多个集合交集的算法思想:
首先按照集合元素个数对集合进行qsort,然后遍历排序后的第一个集合中的元素,查看该元素在
其他集合中是否存在,如果在其他集合中都存在,那么该元素为一个结果
*/
si = setTypeInitIterator(sets);
while((encoding = setTypeNext(si,&eleobj,&intobj)) != -1) {
for (j = 1; j < setnum; j++) {
if (sets == sets) continue;//这段代码没意义啊
if (encoding == REDIS_ENCODING_INTSET) {//intset
/* intset with intset is simple... and fast */
//集合sets编码为intset
if (sets->encoding == REDIS_ENCODING_INTSET &&
!intsetFind((intset*)sets->ptr,intobj))//在集合sets中没有找到集合sets的intobj
{
break;
/* in order to compare an integer with an object we
* have to use the generic function, creating an object
* for this */
} else if (sets->encoding == REDIS_ENCODING_HT) {//集合sets编码为HT,sets为INTSET
eleobj = createStringObjectFromLongLong(intobj);//将sets中的intobj转换为sds
if (!setTypeIsMember(sets,eleobj)) {//如果eleobj不在集合sets中
decrRefCount(eleobj);
break;
}
decrRefCount(eleobj);
}
} else if (encoding == REDIS_ENCODING_HT) {//HT
/* Optimization... if the source object is integer
* encoded AND the target set is an intset, we can get
* a much faster path. */
if (eleobj->encoding == REDIS_ENCODING_INT &&
sets->encoding == REDIS_ENCODING_INTSET &&
!intsetFind((intset*)sets->ptr,(long)eleobj->ptr))
{
break;
/* else... object to object check is easy as we use the
* type agnostic API here. */
} else if (!setTypeIsMember(sets,eleobj)) {
break;
}
}
}
/* Only take action when all sets contain the member */
if (j == setnum) {
if (!dstkey) {
if (encoding == REDIS_ENCODING_HT)
addReplyBulk(c,eleobj);
else
addReplyBulkLongLong(c,intobj);
cardinality++;
} else {//添加到临时目标集合
if (encoding == REDIS_ENCODING_INTSET) {
eleobj = createStringObjectFromLongLong(intobj);
setTypeAdd(dstset,eleobj);
decrRefCount(eleobj);
} else {
setTypeAdd(dstset,eleobj);
}
}
}
}
setTypeReleaseIterator(si);
if (dstkey) {
/* Store the resulting set into the target, if the intersection
* is not an empty set. */
int deleted = dbDelete(c->db,dstkey);//覆盖原来的目标集合
if (setTypeSize(dstset) > 0) {
dbAdd(c->db,dstkey,dstset);
addReplyLongLong(c,setTypeSize(dstset));
notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sinterstore",
dstkey,c->db->id);
} else {//空集
decrRefCount(dstset);
addReply(c,shared.czero);
if (deleted)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
dstkey,c->db->id);
}
signalModifiedKey(c->db,dstkey);
server.dirty++;
} else {
setDeferredMultiBulkLength(c,replylen,cardinality);
}
zfree(sets);
}
/*SINTER key */
void sinterCommand(redisClient *c) {//计算所有给定集合的交集
sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
}
/*SINTER destination key */
void sinterstoreCommand(redisClient *c) {//计算所有给定集合的交集,但存储在集合destination中
sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv);
}
集合求差算法
集合求差有两个指令:SDIFF与SDIFFSTORE
SDIFF key
返回所有给定集合之间的差集,不存在的集合key视为空集。
时间复杂度: O(N),N 是所有给定集合的成员数量之和。
SDIFFSTORE destination key
与SDIFF 类似,但它将差集的结果保存到集合destination,如果集合destination已经存在,则将其覆盖。
集合求并算法
集合求并有两个指令:SUNION与SUNIONSTORE
SUNION key
返回所有给定集合的并集,不存在的集合key被视为空集。
时间复杂度: O(N),N 是所有给定集合的成员数量之和。
SUNIONSTORE destination key
与SUNION类似,但它将并集结果保存到集合destination,如果集合destination已经存在,则将其覆盖。
#define REDIS_OP_UNION 0
#define REDIS_OP_DIFF 1
#define REDIS_OP_INTER 2
void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op) {
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;
robj *ele, *dstset = NULL;
int j, cardinality = 0;
int diff_algo = 1;
for (j = 0; j < setnum; j++) {//取出所有集合
robj *setobj = dstkey ?
lookupKeyWrite(c->db,setkeys) :
lookupKeyRead(c->db,setkeys);
if (!setobj) {
sets = NULL;
continue;
}
if (checkType(c,setobj,REDIS_SET)) {
zfree(sets);
return;
}
sets = setobj;
}
/* Select what DIFF algorithm to use.
*
* Algorithm 1 is O(N*M) where N is the size of the element first set
* and M the total number of sets.
*
* Algorithm 2 is O(N) where N is the total number of elements in all
* the sets.
*
* We compute what is the best bet with the current input here. */
//对于SDIFF指令选择最优算法
if (op == REDIS_OP_DIFF && sets) {
long long algo_one_work = 0, algo_two_work = 0;
for (j = 0; j < setnum; j++) {
if (sets == NULL) continue;
algo_one_work += setTypeSize(sets);
algo_two_work += setTypeSize(sets);
}
/* Algorithm 1 has better constant times and performs less operations
* if there are elements in common. Give it some advantage. */
algo_one_work /= 2;//算法1可能不需要全部比较,因此除2来降低常数时间
diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2;
if (diff_algo == 1 && setnum > 1) {
/* With algorithm 1 it is better to order the sets to subtract
* by decreasing size, so that we are more likely to find
* duplicated elements ASAP. */
//对sets至sets按照集合元素的个数从大到小排序
qsort(sets+1,setnum-1,sizeof(robj*),
qsortCompareSetsByRevCardinality);
}
}
/* We need a temp set object to store our union. If the dstkey
* is not NULL (that is, we are inside an SUNIONSTORE operation) then
* this set object will be the resulting object to set into the target key*/
dstset = createIntsetObject();
if (op == REDIS_OP_UNION) {//并集操作,把所有元素不重复的操作即可
/* Union is trivial, just add every element of every set to the
* temporary set. */
for (j = 0; j < setnum; j++) {
if (!sets) continue; /* non existing keys are like empty sets */
si = setTypeInitIterator(sets);
while((ele = setTypeNextObject(si)) != NULL) {
// 已有的元素不会被计数
if (setTypeAdd(dstset,ele)) cardinality++;
decrRefCount(ele);
}
setTypeReleaseIterator(si);
}
} else if (op == REDIS_OP_DIFF && sets && diff_algo == 1) {//选择算法1
/* DIFF Algorithm 1:
*
* We perform the diff by iterating all the elements of the first set,
* and only adding it to the target set if the element does not exist
* into all the other sets.
*
* This way we perform at max N*M operations, where N is the size of
* the first set, and M the number of sets. */
/** 遍历 sets ,对于其中的每个元素ele,
只有ele在set至set的每个集合中均不存在,该元素ele才是一个结果
算法复杂度: O(MlogM) + O(sum(size(sets) * size(sets))) j =
M = setnum - 1
*/
si = setTypeInitIterator(sets);
while((ele = setTypeNextObject(si)) != NULL) {
for (j = 1; j < setnum; j++) {
if (!sets) continue; /* no key is an empty set. *///空集合
if (setTypeIsMember(sets,ele)) break;
}
if (j == setnum) {
/* There is no other set with this element. Add it. */
setTypeAdd(dstset,ele);
cardinality++;
}
decrRefCount(ele);
}
setTypeReleaseIterator(si);
} else if (op == REDIS_OP_DIFF && sets && diff_algo == 2) {//选择算法2
/* DIFF Algorithm 2:
*
* Add all the elements of the first set to the auxiliary set.
* Then remove all the elements of all the next sets from it.
*
* This is O(N) where N is the sum of all the elements in every
* set. */
/**将 sets 的所有元素保存到临时目标集合dstset中
遍历set至set的每个集合,如果被遍历集合和 dstset 有相同的元素,
那么从dstset中删除那个元素
算法复杂度:O(sum(size(sets))) j =
*/
for (j = 0; j < setnum; j++) {
if (!sets) continue; /* non existing keys are like empty sets */
si = setTypeInitIterator(sets);
while((ele = setTypeNextObject(si)) != NULL) {
if (j == 0) {
if (setTypeAdd(dstset,ele)) cardinality++;
} else {
if (setTypeRemove(dstset,ele)) cardinality--;
}
decrRefCount(ele);
}
setTypeReleaseIterator(si);
/* Exit if result set is empty as any additional removal
* of elements will have no effect. */
if (cardinality == 0) break;
}
}
/* Output the content of the resulting set, if not in STORE mode */
if (!dstkey) {
addReplyMultiBulkLen(c,cardinality);
si = setTypeInitIterator(dstset);
while((ele = setTypeNextObject(si)) != NULL) {
addReplyBulk(c,ele);
decrRefCount(ele);
}
setTypeReleaseIterator(si);
decrRefCount(dstset);
} else {
/* If we have a target key where to store the resulting set
* create this key with the result set inside */
int deleted = dbDelete(c->db,dstkey);//dstkey已存在直接删除
if (setTypeSize(dstset) > 0) {
dbAdd(c->db,dstkey,dstset);
addReplyLongLong(c,setTypeSize(dstset));
notifyKeyspaceEvent(REDIS_NOTIFY_SET,
op == REDIS_OP_UNION ? "sunionstore" : "sdiffstore",
dstkey,c->db->id);
} else {
decrRefCount(dstset);
addReply(c,shared.czero);
if (deleted)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
dstkey,c->db->id);
}
signalModifiedKey(c->db,dstkey);
server.dirty++;
}
zfree(sets);
}
/*SUNION key */
void sunionCommand(redisClient *c) {//计算所有给定集合的并集
sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
}
/*SUNIONSTORE destination key */
void sunionstoreCommand(redisClient *c) {//计算所有给定集合的并集,当将结果存储在destination中
sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv,REDIS_OP_UNION);
}
/*SDIFF key */
void sdiffCommand(redisClient *c) {//计算第一个集合与另外所有集合的差集
sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
}
/*SDIFFSTORE destination key */
void sdiffstoreCommand(redisClient *c) {//与SDIFF类似,但将结果存储在destination中
sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv,REDIS_OP_DIFF);
}
小结
集合是Redis中重要的数据类型,其存储使用intset与hash table(dict)两种数据结构,集合的所有指令都比较简单易懂,集合求差算法的两种优化方式可以学习。
集合所有指令的注解http://redis.io/commands#set
感谢黄健宏(huangz1990)的Redis设计与实现及其他对Redis2.6源码的相关注释对我在研究Redis2.8源码方面的帮助。
夜里不适合听那首歌因为了解思念是哪种颜色 我 們 有 太 多 的 無 奈 ,可 我 已 經 無 力 訴 說 。 双方都属神经质的星座.彼此疑神疑鬼,最后可说是身心俱疲,两败俱伤,只有由爱生恨而已.
页:
[1]