设为首页 收藏本站
查看: 1135|回复: 0

[经验分享] Memcached源码阅读之线程交互

[复制链接]

尚未签到

发表于 2015-11-18 15:25:42 | 显示全部楼层 |阅读模式
  Memcached按之前的分析可以知道,其是典型的Master-Worker线程模型,这种模型很典型,其工作模型是Master绑定端口,监听网络连接,接受网络连接之后,通过线程间通信来唤醒Worker线程,Worker线程已经连接的描述符执行读写操作,这种模型简化了整个通信模型,下面分析下这个过程。
  

case conn_listening:
addrlen = sizeof(addr);
                //Master线程(main)进入状态机之后执行accept操作,这个操作也是非阻塞的。
                if ((sfd = accept(c->sfd, (struct sockaddr *) &addr, &addrlen)) == -1)
{
//非阻塞模型,这个错误码继续等待
                        if (errno == EAGAIN || errno == EWOULDBLOCK)
{
stop = true;
}
                        //连接超载
                        else if (errno == EMFILE)
{
if (settings.verbose > 0)
fprintf(stderr, "Too many open connections\n");
accept_new_conns(false);
stop = true;
}
else
{
perror("accept()");
stop = true;
}
break;
}
                //已经accept成功,将accept之后的描述符设置为非阻塞的
                if ((flags = fcntl(sfd, F_GETFL, 0)) < 0
|| fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0)
{
perror(&quot;setting O_NONBLOCK&quot;);
close(sfd);
break;
}
                //判断是否超过最大连接数
if (settings.maxconns_fast
&& stats.curr_conns + stats.reserved_fds
>= settings.maxconns - 1)
{
str = &quot;ERROR Too many open connections\r\n&quot;;
res = write(sfd, str, strlen(str));
close(sfd);
STATS_LOCK();
stats.rejected_conns++;
STATS_UNLOCK();
}
else
{       //直线连接分发   
                        dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, tcp_transport);
}
stop = true;
break;
这个是TCP的连接建立过程,由于UDP不需要建立连接,所以直接分发给Worker线程,让Worker线程进行读写操作,而TCP在建立连接之后,也执行连接分发(和UDP的一样),下面看看dispatch_conn_new内部是如何进行链接分发的。  

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
CQ_ITEM *item = cqi_new();//创建一个连接队列
char buf[1];
int tid = (last_thread + 1) % settings.num_threads;//通过round-robin算法选择一个线程
LIBEVENT_THREAD *thread = threads + tid;//thread数组存储了所有的工作线程
last_thread = tid;//缓存这次的线程编号,下次待用
item->sfd = sfd;//sfd表示accept之后的描述符
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
cq_push(thread->new_conn_queue, item);//投递item信息到Worker线程的工作队列中
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
buf[0] = 'c';
    //在Worker线程的notify_send_fd写入字符c,表示有连接   
    if (write(thread->notify_send_fd, buf, 1) != 1) {
perror(&quot;Writing to thread notify pipe&quot;);
}
}

  
  投递到子线程的连接队列之后,同时,通过忘子线程的PIPE管道写入字符c来,下面我们看看子线程是如何处理的?
  //子线程会在PIPE管道读上面建立libevent事件,事件回调函数是thread_libevent_process
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
    CQ_ITEM *item;
    char buf[1];
    if (read(fd, buf, 1) != 1)//PIPE管道读取一个字节的数据
        if (settings.verbose > 0)
            fprintf(stderr, &quot;Can't read from libevent pipe\n&quot;);
    switch (buf[0]) {
    case 'c'://如果是c,则处理网络连接
    item = cq_pop(me->new_conn_queue);//从连接队列读出Master线程投递的消息
    if (NULL != item) {
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);//创建连接
        if (c == NULL) {
            if (IS_UDP(item->transport)) {
                fprintf(stderr, &quot;Can't listen for events on UDP socket\n&quot;);
                exit(1);
            } else {
                if (settings.verbose > 0) {
                    fprintf(stderr, &quot;Can't listen for events on fd %d\n&quot;,
                        item->sfd);
                }
                close(item->sfd);
            }
        } else {
            c->thread = me;
        }
        cqi_free(item);
    }
        break;
    }
}
之前分析过conn_new的执行流程,conn_new里面会建立sfd的网络监听libevent事件,事件回调函数为event_handler。
  event_set(&c->event, sfd, event_flags, event_handler, (void *) c);
event_base_set(base, &c->event);而event_handler的执行流程最终会进入到业务处理的状态机中,关于状态机,后续分析。



版权声明:本文为博主原创文章,未经博主允许不得转载。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-140821-1-1.html 上篇帖子: 当 MySQL 和 Memcached 遇到尾部空格时 下篇帖子: 数据库索引原理(2)------MemCached
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表