设为首页 收藏本站
查看: 1207|回复: 0

[经验分享] memcache 多线程 并发模型

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-11-18 10:34:02 | 显示全部楼层 |阅读模式
memcached,相信我们搞linux后端的农民工都知道!这里简单的分析一下memcached是如何处理大量并发的连接的。
如题,memcached是个单进程程序,单进程多线程的程序(linuxer可能会会心一笑,这不就是多进程嘛)。memcached底层是用的libevent来管理事件的,下面我们就来看看这个libevent的经典应用是如何运转的。其实一开始memcached是个正宗的单进程程序,其实使用了异步技术后基本能把cpu和网卡的性能发挥到极限了(这种情况下硬是多线程反而会使程序性能下降),只不过后来随着多核cpu的普及,为了榨光cpu的性能,引入多线程也是顺势而为。
memcached的源码结构非常简单,其中线程相关的代码基本都在Thread.c中。简单的说,memcached的众多线程就是个Master-Worker的模型,其中主线程负责接收连接,然后将连接分给各个worker线程,在各个worker线程中完成命令的接收,处理和返回结果。上个图:

OK,让我们从main函数开始,一步一步来。
main函数中,线程相关的代码基本就下面几行:
123456789101112131415161718192021222324case't':    //此处处理-t参数,设置线程数。    //注意下面的WARNING,线程数超过了cpu核的个数其实没有意义了,只会有负作用    settings.num_threads= atoi(optarg);    if(settings.num_threads <= 0) {        fprintf(stderr,&quot;Numberof threads must be greater than 0\n&quot;);        return1;    }    /*There're other problems when you get above 64 threads.        *In the future we should portably detect # of cores for the        *default.        */    if(settings.num_threads > 64) {        fprintf(stderr,&quot;WARNING:Setting a high number of worker&quot;                        &quot;threadsis not recommended.\n&quot;                        &quot;Set this value to the number of cores in&quot;                        &quot;your machine or less.\n&quot;);    }    break; //此处调用线程初始化函数,main_base是主线程的libevent句柄,//由于libevent不支持多线程共享句柄,所以每个线程都有一个libevent句柄/*start up worker threads if MT mode */thread_init(settings.num_threads,main_base);下面,进入线程的初始化环节,在看thread_init这个函数之前,先看几个结构体:
123456789101112131415161718//worker线程结构体typedefstruct {    pthread_tthread_id;        /*线程ID */    structevent_base *base;    /*此线程的libevent句柄 */    structevent notify_event;  /*通知事件,主线程通过这个事件通知worker线程有新连接 */    intnotify_receive_fd;      /*通知事件关联的读fd,这和下面的notify_send_fd是一对管道,具体使用后面讲 */    intnotify_send_fd;         /*通知事件关联的写fd,后面讲 */    structthread_stats stats;  /*线程相关统计相信 */    structconn_queue *new_conn_queue; /*由主线程分配过来还没来得及处理的连接(客户端)的队列 */    cache_t*suffix_cache;      /*suffix cache */}LIBEVENT_THREAD; //这是主线程的结构体,就比较简单了//这个结构体的实例只有一个全局的dispatcher_threadtypedefstruct {    pthread_tthread_id;        /*主线程ID */    structevent_base *base;    /*libevent句柄 */}LIBEVENT_DISPATCHER_THREAD;下面进入线程初始化函数:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960voidthread_init(intnthreads, structevent_base *main_base) {    int        i;     //先是初始化一堆锁     //这是主锁,用来同步key-value缓存的存取    pthread_mutex_init(&cache_lock,NULL);      //这是缓存状态锁,用来同步memcached的一些统计数据的存取    pthread_mutex_init(&stats_lock,NULL);     //这个锁是用来同步init_count(已初始化完的线程数)变量的存取    pthread_mutex_init(&init_lock,NULL);     //这是用来通知所有线程都初始化完成的条件变量    pthread_cond_init(&init_cond,NULL);     //这个锁是用来同步空闲连接链表的存取    pthread_mutex_init(&cqi_freelist_lock,NULL);    cqi_freelist= NULL;     //分配worker线程结构体内存    threads= calloc(nthreads,sizeof(LIBEVENT_THREAD));    if(! threads) {        perror(&quot;Can'tallocate thread descriptors&quot;);        exit(1);    }     //把主线程先设置好    dispatcher_thread.base= main_base;    dispatcher_thread.thread_id= pthread_self();     //设置所有worker线程与主线程之间的管道    for(i = 0; i < nthreads; i&#43;&#43;) {        intfds[2];        if(pipe(fds)) {            perror(&quot;Can'tcreate notify pipe&quot;);            exit(1);        }         threads.notify_receive_fd= fds[0];        threads.notify_send_fd= fds[1];         //这个函数进行worker线程的初始化工作        //比如libevent句柄,连接队列等的初始化        setup_thread(&threads);    }     //这里就是真正调用pthread_create创建线程的地方了    for(i = 0; i < nthreads; i&#43;&#43;) {        create_worker(worker_libevent,&threads);    }     //主线程等所有的worker线程都跑起来了之后再跑后面的代码(接受连接)    pthread_mutex_lock(&init_lock);    while(init_count < nthreads) {        pthread_cond_wait(&init_cond,&init_lock);    }    pthread_mutex_unlock(&init_lock);}好了,初始化完成,各个线程(包括主线程)都跑了起来,下面我们看看具体的连接是怎么处理的。
先看主线程,在thread_init返回(所有线程初始化完成)之后,main函数做了一些其他的初始化之后就调用了event_base_loop(main_base, 0);这个函数开始处理网络事件,接受连接了。在此之前,main函数在绑定监听端口的时候就已经把监听socket的事件加到了main_base中了(参看server_socket函数,不多说)。监听事件的回调函数是memcached中所有网络事件公用的回调函数event_handler,而这个event_handler也是基本什么都不干,直接又调用drive_machine,这个函数是由一个大大是switch组成的大状态机。这里就是memcached所有网络事件的处理中枢,我们来看看:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556staticvoid drive_machine(conn *c) {    boolstop = false;    intsfd, flags = 1;    socklen_taddrlen;    structsockaddr_storage addr;    intnreqs = settings.reqs_per_event;    intres;     while(!stop) {         switch(c->state){        //这个监听状态只有主线程的监听fd才会有,而主线程也就基本就这么一个状态        caseconn_listening:            //到这,说明有新连接来了            //accept新连接            addrlen= sizeof(addr);            if((sfd = accept(c->sfd, (structsockaddr *)&addr, &addrlen)) == -1) {                if(errno== EAGAIN || errno== EWOULDBLOCK) {                    /*these are transient, so don't log anything */                    stop= true;                }elseif (errno== EMFILE) {                    if(settings.verbose > 0)                        fprintf(stderr,&quot;Toomany open connections\n&quot;);                    accept_new_conns(false);                    stop= true;                }else{                    perror(&quot;accept()&quot;);                    stop= true;                }                break;            }             //设置套接字非阻塞            if((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||                fcntl(sfd,F_SETFL, flags | O_NONBLOCK) < 0) {                perror(&quot;settingO_NONBLOCK&quot;);                close(sfd);                break;            }             //将新连接分给worker线程            dispatch_conn_new(sfd,conn_new_cmd, EV_READ | EV_PERSIST,                                     DATA_BUFFER_SIZE,tcp_transport);            stop= true;            break;             //下面是worker线程的一些事件,此处略        caseconn_waiting:            //...         caseconn_read:            //...    }     return;}接着看dispatch_conn_new这个函数:
12345678910111213141516171819202122232425262728voiddispatch_conn_new(intsfd, enumconn_states init_state, intevent_flags,                       intread_buffer_size, enumnetwork_transport transport) {    //分配一个连接队列item,此item将会由主线程塞到worker线程的连接队列中    CQ_ITEM*item = cqi_new();     //RR轮询得到这个连接的目标线程    inttid = (last_thread &#43; 1) % settings.num_threads;    LIBEVENT_THREAD*thread= threads &#43; tid;    last_thread= tid;     //初始化item    item->sfd= sfd;    item->init_state= init_state;    item->event_flags= event_flags;    item->read_buffer_size= read_buffer_size;    item->transport= transport;     //将item塞到worker线程的队列中    cq_push(thread->new_conn_queue,item);     MEMCACHED_CONN_DISPATCH(sfd,thread->thread_id);    //向worker线程的通知写fd中写一个字节,如此notify_receive_fd就会有一个字节可读    //这样worker线程的notify_event就会收到一个可读的事件    //memcached就是这样来达到线程间异步通知的目的,很tricky    if(write(thread->notify_send_fd,&quot;&quot;,1) != 1) {        perror(&quot;Writingto thread notify pipe&quot;);    }}好了,自此主线程处理连接的逻辑基本就没了,下面看看worker线程的相关代码。worker线程初始化完成后将notify_event的libevent事件的回调注册到了thread_libevent_process上,来看看:
1234567891011121314151617181920212223242526272829303132333435staticvoid thread_libevent_process(intfd, shortwhich, void*arg) {    LIBEVENT_THREAD*me = arg;    CQ_ITEM*item;    charbuf[1];     //将主线程写入的一个字节读掉,一个字节代表一个连接    if(read(fd, buf, 1) != 1)        if(settings.verbose > 0)            fprintf(stderr,&quot;Can'tread from libevent pipe\n&quot;);     //将主线程塞到队列中的连接pop出来    item= cq_pop(me->new_conn_queue);     if(NULL != item) {        //初始化新连接,注册事件监听,回调到前面提到的event_handler上        conn*c = conn_new(item->sfd, item->init_state, item->event_flags,                           item->read_buffer_size,item->transport, me->base);        if(c == NULL) {            if(IS_UDP(item->transport)) {                fprintf(stderr,&quot;Can'tlisten for events on UDP socket\n&quot;);                exit(1);            }else{                if(settings.verbose > 0) {                    fprintf(stderr,&quot;Can'tlisten for events on fd %d\n&quot;,                        item->sfd);                }                close(item->sfd);            }        }else{            c->thread= me;        }        //回收item        cqi_free(item);    }}好了,这样worker线程就多了一个连接了,后面worker线程就是不断的监听notify事件添加连接和客户端连接socket的网络IO事件处理业务了。
memcached的这套多线程libevent机制几乎成了高性能服务器的一本教材。linux后端农民工必读。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-140627-1-1.html 上篇帖子: 在tomcat日志里打印memcache日志,并实现session共享 下篇帖子: 基于 自定义注解 和 aop 实现使用memcache 对数据库的缓存 示例
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表