设为首页 收藏本站
查看: 851|回复: 0

[经验分享] Redis网络监听(3)

[复制链接]

尚未签到

发表于 2016-12-19 06:24:02 | 显示全部楼层 |阅读模式
是介绍Redis网络监听的最后一篇文章,着重分析定时时间处理函数serverCron,这个函数其实已经和网络监听没多大关系了,当时因为其绑定在Redis自定义的事件库的定时事件上,所以放到一起来讲。serverCron的这个函数对Redis的正常运行来说很重要,对于Redis的使用者来说,最重要的就是能够迅速直观地看到Redis的当前的运行状况(keys,sizes,memory等),serverCron就能够使用户得知这些信息,此外,serverCron这个方法定时周期地运行,还承担了AOF Write,VM Swap,BGSAVE,Rehash的操作,使得Redis的运行更加平稳。还是来直接通过代码来分析:
C代码  收藏代码
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {  
    int j, loops = server.cronloops;  
    REDIS_NOTUSED(eventLoop);  
    REDIS_NOTUSED(id);  
    REDIS_NOTUSED(clientData);  
  
    /* We take a cached value of the unix time in the global state because
     * with virtual memory and aging there is to store the current time
     * in objects at every object access, and accuracy is not needed.
     * To access a global var is faster than calling time(NULL) */  
    server.unixtime = time(NULL);  
    /* We have just 22 bits per object for LRU information.
     * So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
     * 2^22 bits with 10 seconds resoluton is more or less 1.5 years.
     *
     * Note that even if this will wrap after 1.5 years it's not a problem,
     * everything will still work but just some object will appear younger
     * to Redis. But for this to happen a given object should never be touched
     * for 1.5 years.
     *
     * Note that you can change the resolution altering the
     * REDIS_LRU_CLOCK_RESOLUTION define.
     */  
    updateLRUClock();  
  
    /* We received a SIGTERM, shutting down here in a safe way, as it is
     * not ok doing so inside the signal handler. */  
    if (server.shutdown_asap) {  
        if (prepareForShutdown() == REDIS_OK) exit(0);  
        redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");  
    }  
  
    /* Show some info about non-empty databases */  
    for (j = 0; j < server.dbnum; j++) {  
        long long size, used, vkeys;  
  
        size = dictSlots(server.db[j].dict);  
        used = dictSize(server.db[j].dict);  
        vkeys = dictSize(server.db[j].expires);  
        if (!(loops % 50) && (used || vkeys)) {  
            redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);  
            /* dictPrintStats(server.dict); */  
        }  
    }  
  
    /* We don't want to resize the hash tables while a bacground saving
     * is in progress: the saving child is created using fork() that is
     * implemented with a copy-on-write semantic in most modern systems, so
     * if we resize the HT while there is the saving child at work actually
     * a lot of memory movements in the parent will cause a lot of pages
     * copied. */  
    if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1) {  
        if (!(loops % 10)) tryResizeHashTables();  
        if (server.activerehashing) incrementallyRehash();  
    }  
  
    /* Show information about connected clients */  
    if (!(loops % 50)) {  
        redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",  
            listLength(server.clients)-listLength(server.slaves),  
            listLength(server.slaves),  
            zmalloc_used_memory());  
    }  
  
    /* Close connections of timedout clients */  
    if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)  
        closeTimedoutClients();  
  
    /* Check if a background saving or AOF rewrite in progress terminated */  
    if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {  
        int statloc;  
        pid_t pid;  
  
        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {  
            if (pid == server.bgsavechildpid) {  
                backgroundSaveDoneHandler(statloc);  
            } else {  
                backgroundRewriteDoneHandler(statloc);  
            }  
            updateDictResizePolicy();  
        }  
    } else {  
        /* If there is not a background saving in progress check if
         * we have to save now */  
         time_t now = time(NULL);  
         for (j = 0; j < server.saveparamslen; j++) {  
            struct saveparam *sp = server.saveparams+j;  
  
            if (server.dirty >= sp->changes &&  
                now-server.lastsave > sp->seconds) {  
                redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",  
                    sp->changes, sp->seconds);  
                rdbSaveBackground(server.dbfilename);  
                break;  
            }  
         }  
    }  
  
    /* Expire a few keys per cycle, only if this is a master.
     * On slaves we wait for DEL operations synthesized by the master
     * in order to guarantee a strict consistency. */  
    if (server.masterhost == NULL) activeExpireCycle();  
  
    /* Swap a few keys on disk if we are over the memory limit and VM
     * is enbled. Try to free objects from the free list first. */  
    if (vmCanSwapOut()) {  
        while (server.vm_enabled && zmalloc_used_memory() >  
                server.vm_max_memory)  
        {  
            int retval = (server.vm_max_threads == 0) ?  
                        vmSwapOneObjectBlocking() :  
                        vmSwapOneObjectThreaded();  
            if (retval == REDIS_ERR && !(loops % 300) &&  
                zmalloc_used_memory() >  
                (server.vm_max_memory+server.vm_max_memory/10))  
            {  
                redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");  
            }  
            /* Note that when using threade I/O we free just one object,
             * because anyway when the I/O thread in charge to swap this
             * object out will finish, the handler of completed jobs
             * will try to swap more objects if we are still out of memory. */  
            if (retval == REDIS_ERR || server.vm_max_threads > 0) break;  
        }  
    }  
  
    /* Replication cron function -- used to reconnect to master and
     * to detect transfer failures. */  
    if (!(loops % 10)) replicationCron();  
  
    server.cronloops++;  
    return 100;  
}  
  
/* This function gets called every time Redis is entering the
* main loop of the event driven library, that is, before to sleep
* for ready file descriptors. */  
void beforeSleep(struct aeEventLoop *eventLoop) {  
    REDIS_NOTUSED(eventLoop);  
    listNode *ln;  
    redisClient *c;  
  
    /* Awake clients that got all the swapped keys they requested */  
    if (server.vm_enabled && listLength(server.io_ready_clients)) {  
        listIter li;  
  
        listRewind(server.io_ready_clients,&li);  
        while((ln = listNext(&li))) {  
            c = ln->value;  
            struct redisCommand *cmd;  
  
            /* Resume the client. */  
            listDelNode(server.io_ready_clients,ln);  
            c->flags &= (~REDIS_IO_WAIT);  
            server.vm_blocked_clients--;  
            aeCreateFileEvent(server.el, c->fd, AE_READABLE,  
                readQueryFromClient, c);  
            cmd = lookupCommand(c->argv[0]->ptr);  
            redisAssert(cmd != NULL);  
            call(c,cmd);  
            resetClient(c);  
            /* There may be more data to process in the input buffer. */  
            if (c->querybuf && sdslen(c->querybuf) > 0)  
                processInputBuffer(c);  
        }  
    }  
  
    /* Try to process pending commands for clients that were just unblocked. */  
    while (listLength(server.unblocked_clients)) {  
        ln = listFirst(server.unblocked_clients);  
        redisAssert(ln != NULL);  
        c = ln->value;  
        listDelNode(server.unblocked_clients,ln);  
        c->flags &= ~REDIS_UNBLOCKED;  
  
        /* Process remaining data in the input buffer. */  
        if (c->querybuf && sdslen(c->querybuf) > 0)  
            processInputBuffer(c);  
    }  
  
    /* Write the AOF buffer on disk */  
    flushAppendOnlyFile();  
}  
i.    首先将server.cronloops的值赋给loops,server.cronloops指的是serverCron函数的运行次数,每运行一次serverCron函数,server.cronloops++,server.cronloops的内部执行逻辑随着server.cronloops值的不同而改变;
ii.    用server.unixtime = time(NULL)来保存当前时间,因为在virtual memory and aging的时候,需要知道每次Object的access时间,但是这个时间不需要很精确,所以通过全局变量来获取时间比调用time(NULL)快多了;
iii.    记录Redis的最大内存使用量;如果收到了SIGTERM信号,则试图终止Redis
iv.    serverCron方法每运行50次显示Redis内各个非空的DB的使用情况(used,keys,sizes)及当前连接的clients,使用的内存大小;
v.    serverCron方法每运行10次,将试图进行一次Rehash,如果一个a bacground saving正在进行,则不进行rehash,以免造成部分数据丢失;
vi.    关闭timeout的clients;
vii.    如果在执行BGSAVE期间,client执行了bgrewriteaof这个命令,则在serverCron将开始执行a scheduled AOF rewrite
viii.    如果当前Redis正在进行BGSAVE或者AOF rewrite,则check BGSAVE或者AOF rewrite是否已经终止,如果终止则调用相应的函数处理(backgroundSaveDoneHandler/backgroundRewriteDoneHandler),如果当前没有BGSAVE或者AOF rewrite操作,则判断是否进行此类操作,如果需要,则触发此类操作;
ix.    如果有AOF buffer flush操作被暂停了,则每次调用serverCron的时候,恢复AOF buffer flush操作
x.    如果是Master,则周期性地使某些key(随即挑选的)过期,注意这个操作仅仅只针对Master,如果是slaves,则只有通过master的del操作来同步key,以做到强一致性;
xi.    VM的Swap操作
xii.    每运行10次,进行replicationCron,如果存在slaves的话
xiii.    返回100,表示serverCron方法每100毫秒被调用一次,这一点在processTimeEvent这个方法里得以体现:
C代码  收藏代码
if (retval != AE_NOMORE) {  
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);  
            } else {  
                aeDeleteTimeEvent(eventLoop, id);  
            }  
            
通过上面的分析,ServerCron侧重在Rehash,VM Swap, AOF write,BGSAVE等操作,而这些操作都是耗时,而且影响Redis对Clients的响应速度的,因此我们在实际应用的时候可以根据具体情况通过改变类似这样的操作:”loops % 10“来决定上述耗时操作的执行频率,有空我会测试下在不同频率下,redis在压力测试下的性能。

此次, Redis的网络监听部分都介绍完了。再回过头来看前面提到的几个问题:
1.         Redis支持 epoll, select, kquque,,通过配置文件来决定采取哪一种
2.         支持文件读写事件和定时事件
3.         采用数组来维护文件事件,链表来保存定时事件(在查找定时事件时,性能不高,有待提高)
4.         Redis Server单线程响应事件,按照先后顺序来响应事件,因此单台 Redis服务器的吞吐量会随着连接的 clients越来越多而下降,可以通过增加更多的 Redis服务器来解决这个问题
5.         Redis在很多代码里都考虑到了尽快地响应各种事件,如在 aeProcessEvent里面,轮询的 wait时间等于当前时间和最近的定时事件响应时间的差值;每次进入轮询 wait之前,在 beforesleep方法里先响应刚刚 unblock的 clients等。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-316022-1-1.html 上篇帖子: WebCollector爬虫的redis插件 下篇帖子: 四 redis学习笔记之事务(转)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表