设为首页 收藏本站
查看: 1503|回复: 0

[经验分享] Redis源码分析(九)--- t_list,t_string的分析

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2014-11-5 09:15:23 | 显示全部楼层 |阅读模式
       今天我是看完了t_list和t_string的代码,从名字知道,这也是和之前的t_hash非常类似的,无非就是各种形式,转化来转化去的。先讲讲t_list,对比于昨天的t_hash,t_hash是ziplist和dict之间的转换,t_list则是描述的是ziplist压缩表和linedlist普通链表,换句话说,当client这个robj作为参数传入的时候,都分为ENCODING_ZIIPLIST和ENCODING_LINKEDLIST 2类编码方式处理。还有一个在t_list出现的一个 比较新颖的东西是出现了锁的概念,操作系统的东西在这里也会碰到了,这里的代码也非常值得学习。下面,我们看看一般的t_list中包括的一些方法和命令:



    /* List方法 */  
    /* list的操作分为2种,LINKEDLIST普通链表和ZIPLIST压缩列表的相关操作 */  
    void listTypeTryConversion(robj *subject, robj *value) /* 判断是否需要将ziplist转为linkedlist */  
    void listTypePush(robj *subject, robj *value, int where) /* 在头部或尾部插入value元素 */  
    robj *listTypePop(robj *subject, int where)  /* 在列表的头部或尾弹出元素 */  
    unsigned long listTypeLength(robj *subject) /* 列表的长度 */  
    listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) /* 返回列表迭代器,方向有头尾之分 */  
    void listTypeReleaseIterator(listTypeIterator *li) /* 释放列表迭代器 */  
    int listTypeNext(listTypeIterator *li, listTypeEntry *entry) /* 根据列表迭代器,获取下一个元素 */  
    robj *listTypeGet(listTypeEntry *entry) /* 获取listType元素,有ziplist和linkedlist */  
    void listTypeInsert(listTypeEntry *entry, robj *value, int where) /* listType了类型插入元素操作 */  
    int listTypeEqual(listTypeEntry *entry, robj *o) /* 判断2个元素是否相等 */  
    void listTypeDelete(listTypeEntry *entry) /* listType类型删除元素 */  
    void listTypeConvert(robj *subject, int enc) /* listType类型的转换操作,这里指的是往linkedList上转 */  
         
    /* List的相关命令 */  
    void pushGenericCommand(redisClient *c, int where) /* 插入操作命令的原始操作 */  
    void lpushCommand(redisClient *c) /* 左边插入元素命令 */  
    void rpushCommand(redisClient *c) /* 右边插入元素命令 */  
    void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) /* 有返回状态的插入操作命令,假设首先都是能够实现插入命令的 */  
    void lpushxCommand(redisClient *c) /* 左边插入元素有返回消息的命令 */  
    void rpushxCommand(redisClient *c) /* 右边插入元素有返回消息的命令 */  
    void linsertCommand(redisClient *c) /* 列表指定位置插入元素操作命令 */  
    void llenCommand(redisClient *c) /* 列表返回长度命令 */  
    void lindexCommand(redisClient *c) /* 获取index位置的上的元素 */  
    void lsetCommand(redisClient *c) /* listType类型设置value命令 */  
    void popGenericCommand(redisClient *c, int where) /* 实现弹出操作的原始命令 */  
    void lpopCommand(redisClient *c) /* 左边弹出元素命令 */  
    void rpopCommand(redisClient *c) /* 右边弹出操作命令 */  
    void lrangeCommand(redisClient *c) /* 移动listType位置操作 */  
    void ltrimCommand(redisClient *c) /* listType实现截取操作,把多余范围的元素删除 */  
    void lremCommand(redisClient *c) /* 移除在listType中出现的与指定的元素相等的元素 */  
    void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) /* 元素从一个obj右边弹出,在从左侧推入另一个obj列表操作 */  
    void rpoplpushCommand(redisClient *c) /* 右边弹出,左边推入元素的命令 */  
    void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) >/* 设置客户端为阻塞模式,并设置超时时间,当请求特定的key元素时 */  
    void unblockClientWaitingData(redisClient *c) /* 客户端解锁操作 */  
    void signalListAsReady(redisDb *db, robj *key) /* 将key存入server中,后续可以用于客户端的存取 */  
    int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where) /* 根据server,Client的key,value情况,判断server此时能否服务于Client,否则Client将被阻塞 */  
    void handleClientsBlockedOnLists(void) /* 服务端解除阻塞住的Client */  
    int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) /* 获取超时时间 */   
    void blockingPopGenericCommand(redisClient *c, int where) /* 阻塞弹出命令的原始操作 */  
    void blpopCommand(redisClient *c) /* 左边弹出数据的阻塞式命令 */  
    void brpopCommand(redisClient *c) /* 右边弹出数据的阻塞式命令 */  
    void brpoplpushCommand(redisClient *c) /* 弹出推入阻塞式命令 */  


看到这么多API,是否有被吓到的感觉,但里面其实很多的方法都很简单,我只挑几个比较典型的方法做一下分析,向什么返回长度了,获取,设置键值的大家可以自己看具体的方法,不难的。一般的t_list的处理流程,比如获取迭代器的方法:



    /* Stores pointer to current the entry in the provided entry structure
     * and advances the position of the iterator. Returns 1 when the current
     * entry is in fact an entry, 0 otherwise. */  
    int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {  
        /* Protect from converting when iterating */  
        redisAssert(li->subject->encoding == li->encoding);  
      
        entry->li = li;  
        if (li->encoding == REDIS_ENCODING_ZIPLIST) {  
            entry->zi = li->zi;  
            if (entry->zi != NULL) {  
                if (li->direction == REDIS_TAIL)  
                    //根据方向调用pre或next的方法  
                    li->zi = ziplistNext(li->subject->ptr,li->zi);  
                else  
                    li->zi = ziplistPrev(li->subject->ptr,li->zi);  
                return 1;  
            }  
        } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {  
            entry->ln = li->ln;  
            if (entry->ln != NULL) {  
                if (li->direction == REDIS_TAIL)  
                    //普通的列表的调用方式同上  
                    li->ln = li->ln->next;  
                else  
                    li->ln = li->ln->prev;  
                return 1;  
            }  
        } else {  
            redisPanic("Unknown list encoding");  
        }  
        return 0;  
    }  


很多API和t_hash的方法都是极其类似的,我就不多说了,我们将关注点,放在列表的锁控制上,先讲个前提,在redis客户端执行操作的时候,会有个请求超时时间,在这个请求的时间内,客户端如果没有找到key对应的数据,是会被阻塞的,什么意思呢,比如:



    /* 设置客户端为阻塞模式,并设置超时时间,当请求特定的key元素时 */  
    /* 这个客户端阻塞的意思:当客户端请求list中某个特定key值时,如果key存在且列表非空,当然 */  
    /* 当然不会阻塞,正常返回数据,如果当客户端请求某个key不存在,或列表为empty的时候,客户端将被阻塞 */  
    /* 只有当有这个key被写入的时候,客户端才会被解锁 *  

这个其实就是操作系统中的PV操作类似的原理,跟java里的wait(),signal()方法的模型是一样的,他的实现代码如下:



    /* This is how the current blocking POP works, we use BLPOP as example:
     * - If the user calls BLPOP and the key exists and contains a non empty list
     *   then LPOP is called instead. So BLPOP is semantically the same as LPOP
     *   if blocking is not required.
     * - If instead BLPOP is called and the key does not exists or the list is
     *   empty we need to block. In order to do so we remove the notification for
     *   new data to read in the client socket (so that we'll not serve new
     *   requests if the blocking request is not served). Also we put the client
     *   in a dictionary (db->blocking_keys) mapping keys to a list of clients
     *   blocking for this keys.
     * - If a PUSH operation against a key with blocked clients waiting is
     *   performed, we mark this key as "ready", and after the current command,
     *   MULTI/EXEC block, or script, is executed, we serve all the clients waiting
     *   for this list, from the one that blocked first, to the last, accordingly
     *   to the number of elements we have in the ready list.
     */  
      
    /* Set a client in blocking mode for the specified key, with the specified
     * timeout */  
    /* 设置客户端为阻塞模式,并设置超时时间,当请求特定的key元素时 */  
    /* 这个客户端阻塞的意思:当客户端请求list中某个特定key值时,如果key存在且列表非空,当然 */  
    /* 当然不会阻塞,正常返回数据,如果当客户端请求某个key不存在,或列表为empty的时候,客户端将被阻塞 */  
    /* 只有当有这个key被写入的时候,客户端才会被解锁 */        
    void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {  
        dictEntry *de;  
        list *l;  
        int j;  
      
        //设置超时时间  
        c->bpop.timeout = timeout;  
        c->bpop.target = target;  
      
        if (target != NULL) incrRefCount(target);  
      
        for (j = 0; j < numkeys; j++) {  
            //下面为为找到的key上锁  
            /* If the key already exists in the dict ignore it. */  
            //如果此时,某些key已经存在了,直接忽略  
            if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;  
            incrRefCount(keys[j]);  
      
            /* And in the other "side", to map keys -> clients */  
            //根据key找到对于请求该key的客户端,也就是将要被阻塞的Client  
            de = dictFind(c->db->blocking_keys,keys[j]);  
            if (de == NULL) {  
                int retval;  
      
                /* For every key we take a list of clients blocked for it */  
                l = listCreate();  
                //为c客户端添加一个阻塞的key  
                retval = dictAdd(c->db->blocking_keys,keys[j],l);  
                //增加key的引用次数  
                incrRefCount(keys[j]);  
                redisAssertWithInfo(c,keys[j],retval == DICT_OK);  
            } else {  
                l = dictGetVal(de);  
            }  
            listAddNodeTail(l,c);  
        }  
      
        /* Mark the client as a blocked client */  
        /* 标记Client为阻塞的客户端 */  
        c->flags |= REDIS_BLOCKED;  
        //服务端的阻塞客户端计数递增  
        server.bpop_blocked_clients++;  
    }  


所有的key未来都会存在于server.readykeys里面,客户端判断自己所请求的key是否存在于服务端中的readykeys中,如果不存在就会被阻塞了。将key存入服务单的数据库的操作:



    /* If the specified key has clients blocked waiting for list pushes, this
     * function will put the key reference into the server.ready_keys list.
     * Note that db->ready_keys is a hash table that allows us to avoid putting
     * the same key again and again in the list in case of multiple pushes
     * made by a script or in the context of MULTI/EXEC.
     *
     * The list will be finally processed by handleClientsBlockedOnLists() */  
    /* 将key存入server中,后续可以用于客户端的存取 */  
    void signalListAsReady(redisDb *db, robj *key) {  
        readyList *rl;  
      
        /* No clients blocking for this key? No need to queue it. */  
        /* 如果没有客户端为此key所阻塞的,直接不添加 */  
        if (dictFind(db->blocking_keys,key) == NULL) return;  
      
        /* Key was already signaled? No need to queue it again. */  
        /* 如果key已经存在,不不要重复添加 */  
        if (dictFind(db->ready_keys,key) != NULL) return;  
      
        /* Ok, we need to queue this key into server.ready_keys. */  
        rl = zmalloc(sizeof(*rl));  
        rl->key = key;  
        rl->db = db;  
        incrRefCount(key);  
        //添加到list列表尾部  
        listAddNodeTail(server.ready_keys,rl);  
      
        /* We also add the key in the db->ready_keys dictionary in order
         * to avoid adding it multiple times into a list with a simple O(1)
         * check. */  
        incrRefCount(key);  
        redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);  
    }  


其实这里我不免会有个小问题,为什么list出现Client被阻塞的操作,而在t_hash中不会出现类似的操作呢?略疑惑。

      下面说说t_string的用法,t_string里面没有涉及什么特定的结构体,就是最直接的键值对的设置,直接改变数据库中的键值,先列出里面的主要方法:



    /* 方法 API */  
    static int checkStringLength(redisClient *c, long long size) /* 检查字符串长度是否超出限制最大长度,512*1024*1024个字节长度 */  
    void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) /* 设置的泛型指令操作 */  
    void setCommand(redisClient *c) /* 设置的综合方法,设置都是根据Client中的参数而定 */  
    void setnxCommand(redisClient *c) /* key不存在的时候才设置值,flag为REDIS_SET_NX时 */  
    void setexCommand(redisClient *c) /* key存在的时候才设置值,flag为REDIS_SET_NO_FLAGS,命令到期时间单位为秒 */  
    void psetexCommand(redisClient *c) /* key存在的时候才设置值,flag为REDIS_SET_NO_FLAGS,命令到期时间单位为毫秒 */  
    int getGenericCommand(redisClient *c) /* 获取命令的泛型命令 */  
    void getCommand(redisClient *c) /* 获取命令的操作 */  
    void getsetCommand(redisClient *c) /* 获取set命令的操作 */  
    void setrangeCommand(redisClient *c) /* 设置rangge命令的操作 */  
    void getrangeCommand(redisClient *c) /* 获取range命令的操作 */  
    void mgetCommand(redisClient *c) /* 执行多次getCommand指令 */  
    void msetGenericCommand(redisClient *c, int nx) /* 一次运行多个设置操作泛型命令 */  
    void msetCommand(redisClient *c) /* 多次设置指令的非NX模式 */  
    void msetnxCommand(redisClient *c)  /* 多次设置指令的NX模式,即只在不存在的时候才设置 */  
    void incrDecrCommand(redisClient *c, long long incr) /* 增加或减少固定值的操作,减少其实就是增加-1 */  
    void incrCommand(redisClient *c) /* 递增1操作,incr递增设置为1 */  
    void decrCommand(redisClient *c) /* 递减1操作,incr递增设置为-1 */  
    void incrbyCommand(redisClient *c) /* 增加指令,从Client中获取递增值 */  
    void decrbyCommand(redisClient *c) /* 减少指令,从Client中获取减少值 */  
    void incrbyfloatCommand(redisClient *c) /* 增加float类型值的操作命令,在获取原始值的时候,获取的方法不一样,为getLongDoubleFromObjectOrReply*/  
    void appendCommand(redisClient *c) /* 追加命令操作,其实也是key:value的形式 */  
    void strlenCommand(redisClient *c) /* 获取命令长度 */  


里面都是一些键值对的简单操作,有一个不同点是,里面的设置操作分为3种:



    /* SET操作的一些FLAG */  
    #define REDIS_SET_NO_FLAGS 0  
    #define REDIS_SET_NX (1<<0)     /* Set if key not exists. */  
    #define REDIS_SET_XX (1<<1)     /* Set if key exists. */  


如上面所说,nx模式指的是key不存在的时候才设置值,xx为存在的时候设置,这种模式设置有命令到期时间的限制,分为单位为秒级和毫秒级,而nx模式下,没有时间上的限制,调用的泛型方法:



    void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply)  

注意其中的expire,和时间单位unit:



    /* key不存在的时候才设置值,flag为REDIS_SET_NX时 */  
    void setnxCommand(redisClient *c) {  
        c->argv[2] = tryObjectEncoding(c->argv[2]);  
        setGenericCommand(c,REDIS_SET_NX,c->argv[1],c->argv[2],NULL,0,shared.cone,shared.czero);  
    }  
      
    /* key存在的时候才设置值,flag为REDIS_SET_NO_FLAGS,命令到期时间单位为秒 */  
    void setexCommand(redisClient *c) {  
        c->argv[3] = tryObjectEncoding(c->argv[3]);  
        setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS,NULL,NULL);  
    }  
      
    /* key存在的时候才设置值,flag为REDIS_SET_NO_FLAGS,命令到期时间单位为毫秒 */  
    void psetexCommand(redisClient *c) {  
        c->argv[3] = tryObjectEncoding(c->argv[3]);  
        setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_MILLISECONDS,NULL,NULL);  
    }  


nx模式时expire和unit参数分别为NULL,0,所以我做出了如下猜想,可能是因为,当键值对存在的时候,考虑多并发设置的情况,有些Client可能被阻塞,所以有超时时间的存在,而key不存在的时候,就看谁更快了吧,就是直接设置。下面还有一个这个文件比较有意思的方法,有点批处理的味道,换句话说,一个方法里执行多次set操作:



    /* 一次运行多个设置操作泛型命令 */  
    void msetGenericCommand(redisClient *c, int nx) {  
        int j, busykeys = 0;  
      
        /* 设置操作一定是成对出现的 */  
        if ((c->argc % 2) == 0) {  
            addReplyError(c,"wrong number of arguments for MSET");  
            return;  
        }  
        /* Handle the NX flag. The MSETNX semantic is to return zero and don't
         * set nothing at all if at least one already key exists. */  
        if (nx) {  
            for (j = 1; j < c->argc; j += 2) {  
                if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {  
                    busykeys++;  
                }  
            }  
              
            //如果key存在,返回0  
            if (busykeys) {  
                addReply(c, shared.czero);  
                return;  
            }  
        }  
      
        //隔2个执行设置key操作指令一次  
        for (j = 1; j < c->argc; j += 2) {  
            c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);  
            setKey(c->db,c->argv[j],c->argv[j+1]);  
            notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",c->argv[j],c->db->id);  
        }  
        server.dirty += (c->argc-1)/2;  
        addReply(c, nx ? shared.cone : shared.ok);  
    }  


也不是什么特别的操作,应该redis编写者为了方便效率的提高吧。其他的一些方法,请读者可以学习t_string.c的文件,还有提醒一点,在redis数据库中,存储的形式都是K-V形式的,所有的获取,设置都是通过KEY的形式操作,你会看到很多这样的方法:



    dbAdd(c->db,c->argv[1],new);  
    setKey(c->db,c->argv[j],c->argv[j+1]);  


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-27088-1-1.html 上篇帖子: Redis源码分析(八)--- t_hash哈希转换 下篇帖子: Redis源码分析(十)--- testhelp.h小型测试框架和redis-check-aof.c日志检测
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表