设为首页 收藏本站
查看: 1126|回复: 0

[经验分享] Memcached源码分析--线程模型(三)

[复制链接]

尚未签到

发表于 2018-12-25 11:23:28 | 显示全部楼层 |阅读模式
  原文:http://www.iteye.com/topic/344172
  最后看看memcached网络事件处理的最核心部分- drive_machine
  
需要铭记于心的是drive_machine是多线程环境执行的,主线程和workers都会执行drive_machine
  


  • static void drive_machine(conn *c) {
  •     bool stop = false;
  •     int sfd, flags = 1;
  •     socklen_t addrlen;
  •     struct sockaddr_storage addr;
  •     int res;

  •     assert(c != NULL);

  •     while (!stop) {

  •         switch(c->state) {
  •         case conn_listening:
  •             addrlen = sizeof(addr);
  •             if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
  •                 //省去n多错误情况处理
  •                 break;
  •             }
  •             if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
  •                 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
  •                 perror(&quot;setting O_NONBLOCK&quot;);
  •                 close(sfd);
  •                 break;
  •             }
  •             dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
  •                                      DATA_BUFFER_SIZE, false);
  •             break;

  •         case conn_read:
  •             if (try_read_command(c) != 0) {
  •                 continue;
  •             }
  •         ....//省略
  •      }
  • }
  

  首先大家不到被while循环误导(大部分做java的同学都会马上联想到是个周而复始的loop)其实while通常满足一个case后就会break了,这里用while是考虑到垂直触发方式下,必须读到EWOULDBLOCK错误才可以
  

  
言归正传,drive_machine主要是通过当前连接的state来判断该进行何种处理,因为通过libevent注册了读写时间后回调的都是这个核心函数,所以实际上我们在注册libevent相应事件时,会同时把事件状态写到该conn结构体里,libevent进行回调时会把该conn结构作为参数传递过来,就是该方法的形参
  

  
memcached里连接的状态通过一个enum声明
  


  • enum conn_states {
  •     conn_listening,  /** the socket which listens for connections */
  •     conn_read,       /** reading in a command line */
  •     conn_write,      /** writing out a simple response */
  •     conn_nread,      /** reading in a fixed number of bytes */
  •     conn_swallow,    /** swallowing unnecessary bytes w/o storing */
  •     conn_closing,    /** closing this connection */
  •     conn_mwrite,     /** writing out many items sequentially */
  • };
  

  实际对于case conn_listening:这种情况是主线程自己处理的,workers线程永远不会执行此分支
  
我们看到主线程进行了accept后调用了
  
  dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,DATA_BUFFER_SIZE, false);
  

  
  这个函数就是通知workers线程的地方,看看
  


  • void dispatch_conn_new(int sfd, int init_state, int event_flags,
  •                        int read_buffer_size, int is_udp) {
  •     CQ_ITEM *item = cqi_new();
  •     int thread = (last_thread + 1) % settings.num_threads;

  •     last_thread = thread;

  •     item->sfd = sfd;
  •     item->init_state = init_state;
  •     item->event_flags = event_flags;
  •     item->read_buffer_size = read_buffer_size;
  •     item->is_udp = is_udp;

  •     cq_push(&threads[thread].new_conn_queue, item);

  •     MEMCACHED_CONN_DISPATCH(sfd, threads[thread].thread_id);
  •     if (write(threads[thread].notify_send_fd, &quot;&quot;, 1) != 1) {
  •         perror(&quot;Writing to thread notify pipe&quot;);
  •     }
  • }
  

  可以清楚的看到,主线程首先创建了一个新的CQ_ITEM,然后通过round robin策略选择了一个thread并通过cq_push将这个CQ_ITEM放入了该线程的CQ队列里,那么对应的workers线程是怎么知道的呢
  

  
就是通过这个write(threads[thread].notify_send_fd, &quot;&quot;, 1)向该线程管道写了1字节数据,则该线程的libevent立即回调了thread_libevent_process方法(上面已经描述过)
  

  
然后那个线程取出item,注册读时间,当该条连接上有数据时,最终也会回调drive_machine方法,也就是drive_machine方法的 case conn_read:等全部是workers处理的,主线程只处理conn_listening 建立连接这个
  

  
这部分代码确实比较多,没法全部贴出来,请大家参考源码,最新版本1.2.6,我省去了很多优化的地方
  
比如,每个CQ_ITEM被malloc时会一次malloc很多个,以减小碎片的产生等等细节。
  Memcached源码分析--线程模型(一)
  Memcached源码分析--线程模型(二)



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-655643-1-1.html 上篇帖子: Memcached源码分析--线程模型(一) 下篇帖子: 为什么不能用memcached存储Session?
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表