设为首页 收藏本站
查看: 1134|回复: 0

[经验分享] Redis源码分析(二十)--- ae事件驱动

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2014-11-5 09:21:08 | 显示全部楼层 |阅读模式
    事件驱动这个名词出现的越来越频繁了,听起来非常高大上,今天本人把Redis内部的驱动模型研究了一番,感觉收获颇丰啊。一个ae.c主程序,加上4个事件类型的文件,让你彻底弄清楚,Redis是如何处理这些事件的。在Redis的事件处理中,用到了epoll,select,kqueue和evport,evport可能大家会陌生许多。前面3个都是非常常见的事件,在libevent的事件网络库中也都有出现。作者在写这个事件驱动模型的时候,也说了,这只是为了简单的复用了,设计的一个小小的处理模型:



    /* A simple event-driven programming library. Originally I wrote this code  
     * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated  
     * it in form of a library for easy reuse.  
     *  
     * ae是作者写的一个简单的事件驱动库,后面进行了转化,变得更为简单的复用  

所以不是很复杂。在了解整个事件驱动的模型前,有先了解一些定义的事件结构体,事件类型总共2个一个FileEvent,TimeEvent:



    /* File event structure */  
    /* 文件事件结构体 */  
    typedef struct aeFileEvent {  
        //只为读事件或者写事件中的1种  
        int mask; /* one of AE_(READABLE|WRITABLE) */  
        //读方法  
        aeFileProc *rfileProc;  
        //写方法  
        aeFileProc *wfileProc;  
        //客户端数据  
        void *clientData;  
    } aeFileEvent;  
      
    /* Time event structure */  
    /* 时间事件结构体 */  
    typedef struct aeTimeEvent {  
        //时间事件id  
        long long id; /* time event identifier. */  
        //时间秒数  
        long when_sec; /* seconds */  
        //时间毫秒  
        long when_ms; /* milliseconds */  
        //时间事件中的处理函数  
        aeTimeProc *timeProc;  
        //被删除的时候将会调用的方法  
        aeEventFinalizerProc *finalizerProc;  
        //客户端数据  
        void *clientData;  
        //时间结构体内的下一个结构体  
        struct aeTimeEvent *next;  
    } aeTimeEvent;  
      
    /* A fired event */  
    /* fired结构体,用来表示将要被处理的文件事件 */  
    typedef struct aeFiredEvent {  
        //文件描述符id  
        int fd;  
        int mask;  
    } aeFiredEvent;  

FireEvent只是用来标记要处理的文件Event。

这些事件都存在于一个aeEventLoop的结构体内:



    /* State of an event based program */  
    typedef struct aeEventLoop {  
        //目前创建的最高的文件描述符  
        int maxfd;   /* highest file descriptor currently registered */  
        int setsize; /* max number of file descriptors tracked */  
        //下一个时间事件id  
        long long timeEventNextId;  
        time_t lastTime;     /* Used to detect system clock skew */  
        //3种事件类型  
        aeFileEvent *events; /* Registered events */  
        aeFiredEvent *fired; /* Fired events */  
        aeTimeEvent *timeEventHead;  
        //事件停止标志符  
        int stop;  
        //这里存放的是event API的数据,包括epoll,select等事件  
        void *apidata; /* This is used for polling API specific data */  
        aeBeforeSleepProc *beforesleep;  
    } aeEventLoop;  

在每种事件内部,都有定义相应的处理函数,把函数当做变量一样存在结构体中。下面看下ae.c中的一些API的组成:



    /* Prototypes */  
    aeEventLoop *aeCreateEventLoop(int setsize); /* 创建aeEventLoop,内部的fileEvent和Fired事件的个数为setSize个 */  
    void aeDeleteEventLoop(aeEventLoop *eventLoop); /* 删除EventLoop,释放相应的事件所占的空间 */  
    void aeStop(aeEventLoop *eventLoop); /* 设置eventLoop中的停止属性为1 */  
    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,  
            aeFileProc *proc, void *clientData); /* 在eventLoop中创建文件事件 */  
    void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); /* 删除文件事件 */  
    int aeGetFileEvents(aeEventLoop *eventLoop, int fd); //根据文件描述符id,找出文件的属性,是读事件还是写事件  
    long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,  
            aeTimeProc *proc, void *clientData,  
            aeEventFinalizerProc *finalizerProc); /* 在eventLoop中添加时间事件,创建的时间为当前时间加上自己传入的时间 */  
    int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id); //根据时间id,删除时间事件,涉及链表的操作  
    int aeProcessEvents(aeEventLoop *eventLoop, int flags); /* 处理eventLoop中的所有类型事件 */  
    int aeWait(int fd, int mask, long long milliseconds); /* 让某事件等待 */  
    void aeMain(aeEventLoop *eventLoop); /* ae事件执行主程序 */  
    char *aeGetApiName(void);  
    void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); /* 每次eventLoop事件执行完后又重新开始执行时调用 */  
    int aeGetSetSize(aeEventLoop *eventLoop); /* 获取eventLoop的大小 */  
    int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); /* EventLoop重新调整大小 */  

无非涉及一些文件,时间事件的添加,修改等,都是在eventLoop内部的修改,我们来看下最主要,最核心的方法:



    /* ae事件执行主程序 */  
    void aeMain(aeEventLoop *eventLoop) {  
        eventLoop->stop = 0;  
        //如果eventLoop中的stop标志位不为1,就循环处理  
        while (!eventLoop->stop) {  
            //每次eventLoop事件执行完后又重新开始执行时调用  
            if (eventLoop->beforesleep != NULL)  
                eventLoop->beforesleep(eventLoop);  
            //while循环处理所有的evetLoop的事件  
            aeProcessEvents(eventLoop, AE_ALL_EVENTS);  
        }  
    }  

道理很简单通过,while循环,处理eventLoop中的所有类型事件,截取部分processEvents()代码:



    numevents = aeApiPoll(eventLoop, tvp);  
           for (j = 0; j < numevents; j++) {  
               aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];  
               int mask = eventLoop->fired[j].mask;  
               int fd = eventLoop->fired[j].fd;  
               int rfired = 0;  
      
        /* note the fe->mask & mask & ... code: maybe an already processed
                * event removed an element that fired and we still didn't
                * processed, so we check if the event is still valid. */  
               if (fe->mask & mask & AE_READABLE) {  
                   rfired = 1;  
                   //根据掩码计算判断是否为ae读事件,调用时间中的读的处理方法  
                   fe->rfileProc(eventLoop,fd,fe->clientData,mask);  
               }  
               if (fe->mask & mask & AE_WRITABLE) {  
                   if (!rfired || fe->wfileProc != fe->rfileProc)  
                       fe->wfileProc(eventLoop,fd,fe->clientData,mask);  
               }  
               processed++;  
           }  
       }  

ae中创建时间事件都是以当前时间为基准创建的;



    /* 在eventLoop中添加时间事件,创建的时间为当前时间加上自己传入的时间 */  
    long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,  
            aeTimeProc *proc, void *clientData,  
            aeEventFinalizerProc *finalizerProc)  
    {  
        long long id = eventLoop->timeEventNextId++;  
        aeTimeEvent *te;  
      
        te = zmalloc(sizeof(*te));  
        if (te == NULL) return AE_ERR;  
        te->id = id;  
        aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);  
        te->timeProc = proc;  
        te->finalizerProc = finalizerProc;  
        te->clientData = clientData;  
        //新加的变为timeEvent的头部  
        te->next = eventLoop->timeEventHead;  
        eventLoop->timeEventHead = te;  
         
        //返回新创建的时间事件的id  
        return id;  
    }  

    下面说说如何调用事件API库里的方法呢。首先隆重介绍什么是epoll,poll,select,kqueu和evport。这些都是一种事件模型。

select事件的模型

(1)创建所关注的事件的描述符集合(fd_set),对于一个描述符,可以关注其上面的读(read)、写(write)、异常(exception)事件,所以通常,要创建三个fd_set, 一个用来收集关注读事件的描述符,一个用来收集关注写事件的描述符,另外一个用来收集关注 异常事件的描述符集合。
(2)轮询所有fd_set中的每一个fd ,检查是否有相应的事件发生,如果有,就进行处理。
   
poll和上面的区别是可以复用文件描述符,上面对一个文件需要轮询3个文件描述符集合,而poll只需要一个,效率更高
epoll是poll的升级版本,把描述符列表交给内核,一旦有事件发生,内核把发生事件的描述符列表通知给进程,这样就避免了轮询整个描述符列表。效率极大提高

evport这个出现的比较少,大致意思是evport将某一个对象的特定 event 与 Event port 相关联:

在了解了3种事件模型的原理之后,我们看看ae.c在Redis中是如何调用的呢,



    //这里存放的是event API的数据,包括epoll,select等事件  
        void *apidata; /* This is used for polling API specific data */  

就是上面这个属性,在上面的4种事件中,分别对应着3个文件,分别为ae_poll.c,ae_select.c,但是他们的API结构是类似的,我举其中一个例子,epoll的例子,首先都会有此事件特定的结构体:



    typedef struct aeApiState {  
        int epfd;  
        struct epoll_event *events;  
    } aeApiState;  

还有共同套路的模板方法:



    static int aeApiCreate(aeEventLoop *eventLoop)  
    static int aeApiResize(aeEventLoop *eventLoop, int setsize)  
    static void aeApiFree(aeEventLoop *eventLoop)  
    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)  
    static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)  
    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)  
    static char *aeApiName(void)  

在创建的时候赋值到eventloop的API data里面去:



    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */  
       if (state->epfd == -1) {  
           zfree(state->events);  
           zfree(state);  
           return -1;  
       }  
       //最后将state的数据赋值到eventLoop的API data中  
       eventLoop->apidata = state;  
       return 0;  

在取出事件的poll方法的时候是这些方法的一个区分点:



    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {  
        aeApiState *state = eventLoop->apidata;  
        int retval, numevents = 0;  
      
        retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,  
                tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);  
        if (retval > 0) {  
    .....  

而在select中的poll方法是这样的:



    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {  
        aeApiState *state = eventLoop->apidata;  
        int retval, j, numevents = 0;  
      
        memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));  
        memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));  
      
        retval = select(eventLoop->maxfd+1,  
                    &state->_rfds,&state->_wfds,NULL,tvp);  
    ......  

最后都是基于state中的事件和eventLoop之间的转化实现操作。传入eventLoop中的信息,传入state的信息,经过内部的处理得出终的事件结果。调用就这么简单。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-27099-1-1.html 上篇帖子: Redis源码分析(十九)--- replication主从数据复制的实现 下篇帖子: Redis源码分析(二十一)--- anet网络通信的封装
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表