设为首页 收藏本站
查看: 1560|回复: 0

[经验分享] memcached源码分析之线程池机制(二)

[复制链接]

尚未签到

发表于 2015-8-31 13:04:28 | 显示全部楼层 |阅读模式
  在上一篇中已分析了memcached线程池的创建流程,由于上篇篇幅较长,因此将memcached线程池中线程的调度流程另立一篇。
  先让我们把目光转到主函数中,主线程在调用thread_init函数创建好线程池后,就开始创建监听套接字,memcached支持TCP,UDP,UNIX域套接字,因此相应的要创建三种监听套接字
  这里我们只分析TCP listening socket的创建(UDP与TCP的创建采用统一的接口),函数入口为:



1  errno = 0;
2         if (settings.port && server_sockets(settings.port, tcp_transport,
3                                            portnumber_file)) {
4             vperror("failed to listen on TCP port %d", settings.port);
5             exit(EX_OSERR);
6         }
  server_sockets函数即为创建TCP listening socket的入口函数。在server_sockets主要调用server_socket函数来实现,



1 /**
2  * Create a socket and bind it to a specific port number
3  * @param interface the interface to bind to
4  * @param port the port number to bind to
5  * @param transport the transport protocol (TCP / UDP)
6  * @param portnumber_file A filepointer to write the port numbers to
7  *        when they are successfully added to the list of ports we
8  *        listen on.
9  */
10 static int server_socket(const char *interface,
11                          int port,
12                          enum network_transport transport,
13                          FILE *portnumber_file);
  server_socket函数实现源码较长,以下只列出部分:



1 static int server_socket(const char *interface,
2                          int port,
3                          enum network_transport transport,
4                          FILE *portnumber_file) {
5     int sfd;
6     struct linger ling = {0, 0};
7     struct addrinfo *ai;
8     struct addrinfo *next;
9     struct addrinfo hints = { .ai_flags = AI_PASSIVE,
10                               .ai_family = AF_UNSPEC };
11     //套接字的创建过程
12     ..................
13     ..................
14         if (IS_UDP(transport)) {
15             int c;
16
17             for (c = 0; c < settings.num_threads_per_udp; c++) {
18                 /* this is guaranteed to hit all threads because we round-robin */
19                 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
20                                   UDP_READ_BUFFER_SIZE, transport);
21             }
22         } else {
23             if (!(listen_conn_add = conn_new(sfd, conn_listening,
24                                              EV_READ | EV_PERSIST, 1,
25                                              transport, main_base))) {
26                 fprintf(stderr, "failed to create listening connection\n");
27                 exit(EXIT_FAILURE);
28             }
29             listen_conn_add->next = listen_conn;
30             listen_conn = listen_conn_add;
31         }
32     }
33
34     freeaddrinfo(ai);
35
36     /* Return zero iff we detected no errors in starting up connections */
37     return success == 0;
38 }
  在server_socket中,我们只关注两个函数:
  (1)dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,UDP_READ_BUFFER_SIZE, transport);
  当创建的是UDP套接字时,使用这个函数,由于UDP是无连接的,因此直接启动settings.num_threads_per_udp个线程来服务于UDP端口。
  (2)listen_conn_add = conn_new(sfd, conn_listening,EV_READ | EV_PERSIST, 1,transport, main_base);
  当创建的是TCP套接字时,调用conn_new函数,源码如下:



1 conn *conn_new(const int sfd, enum conn_states init_state,
2                 const int event_flags,
3                 const int read_buffer_size, enum network_transport transport,
4                 struct event_base *base) {
5     conn *c = conn_from_freelist();
6
7     ........................//略去部分源码
8
9     event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
10     event_base_set(base, &c->event);
11     c->ev_flags = event_flags;
12
13     if (event_add(&c->event, 0) == -1) {
14         if (conn_add_to_freelist(c)) {
15             conn_free(c);
16         }
17         perror("event_add");
18         return NULL;
19     }
20
21     STATS_LOCK();
22     stats.curr_conns++;
23     stats.total_conns++;
24     STATS_UNLOCK();
25
26     MEMCACHED_CONN_ALLOCATE(c->sfd);
27
28     return c;
29 }
  该函数对套接字设置conn_listening监听事件,回调函数为event_handler,在事件响应函数中调用状态机。



1 void event_handler(const int fd, const short which, void *arg) {
2     conn *c;
3
4     c = (conn *)arg;
5     assert(c != NULL);
6
7     c->which = which;
8
9     /* sanity */
10     if (fd != c->sfd) {
11         if (settings.verbose > 0)
12             fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
13         conn_close(c);
14         return;
15     }
16
17     drive_machine(c);
18
19     /* wait for next event */
20     return;
21 }
  memcached中的状态机是memcached运转发动机,它根据链接的不同状态而采取不同的行为,状态枚举如下:



1 enum conn_states {
2     conn_listening,  /**< the socket which listens for connections */
3     conn_new_cmd,    /**< Prepare connection for next command */
4     conn_waiting,    /**< waiting for a readable socket */
5     conn_read,       /**< reading in a command line */
6     conn_parse_cmd,  /**< try to parse a command from the input buffer */
7     conn_write,      /**< writing out a simple response */
8     conn_nread,      /**< reading in a fixed number of bytes */
9     conn_swallow,    /**< swallowing unnecessary bytes w/o storing */
10     conn_closing,    /**< closing this connection */
11     conn_mwrite,     /**< writing out many items sequentially */
12     conn_max_state   /**< Max state value (used for assertion) */
13 };
  状态机的实现函数为drive_machine,由于该函数的源码实现过长,这里只分析对conn_listening状态的响应。



1 static void drive_machine(conn *c) {
2     bool stop = false;
3     int sfd, flags = 1;
4     socklen_t addrlen;
5     struct sockaddr_storage addr;
6     int nreqs = settings.reqs_per_event;
7     int res;
8
9     assert(c != NULL);
10
11     while (!stop) {
12
13         switch(c->state) {
14             //监听套接字发生事件
15         case conn_listening:
16             addrlen = sizeof(addr);
17             if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
18                 if (errno == EAGAIN || errno == EWOULDBLOCK) {
19                     /* these are transient, so don't log anything */
20                     stop = true;
21                 } else if (errno == EMFILE) {
22                     if (settings.verbose > 0)
23                         fprintf(stderr, "Too many open connections\n");
24                     accept_new_conns(false);
25                     stop = true;
26                 } else {
27                     perror("accept()");
28                     stop = true;
29                 }
30                 break;
31             }
32             //新套接字设置非阻塞
33             if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
34                 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
35                 perror("setting O_NONBLOCK");
36                 close(sfd);
37                 break;
38             }
39             //调度线程来处理连接
40             dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
41                                      DATA_BUFFER_SIZE, tcp_transport);
42             stop = true;
43             break;
44 ...........................//略去其他状态的处理
45 }
  从源码中我们可以看到,当监听套接字建立新连接时,通过事件响应函数event_handler来触发状态机,再调用dispatch_conn_new调度新线程来处理这个连接的读写事件。



1 /*
2  * Dispatches a new connection to another thread. This is only ever called
3  * from the main thread, either during initialization (for UDP) or because
4  * of an incoming connection.
5  */
6 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
7                        int read_buffer_size, enum network_transport transport) {
8     CQ_ITEM *item = cqi_new();
9     int tid = (last_thread + 1) % settings.num_threads;
10
11     //以此种方式来取出线程
12     LIBEVENT_THREAD *thread = threads + tid;
13
14     last_thread = tid;
15
16     item->sfd = sfd;
17     item->init_state = init_state;
18     item->event_flags = event_flags;
19     item->read_buffer_size = read_buffer_size;
20     item->transport = transport;
21
22     //将新item放至threads的new_conn_queue队列中
23     cq_push(thread->new_conn_queue, item);
24
25     MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
26     //写一个字节启动新的线程
27     if (write(thread->notify_send_fd, "", 1) != 1) {
28         perror("Writing to thread notify pipe");
29     }
30 }
  至此,memcached的线程池调度机制已分析完毕了。
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-106829-1-1.html 上篇帖子: Velocity, 微软的Memcached?! 下篇帖子: memcached源码分析之线程池机制(一)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表