|
在上一篇中已分析了memcached线程池的创建流程,由于上篇篇幅较长,因此将memcached线程池中线程的调度流程另立一篇。
先让我们把目光转到主函数中,主线程在调用thread_init函数创建好线程池后,就开始创建监听套接字,memcached支持TCP,UDP,UNIX域套接字,因此相应的要创建三种监听套接字
这里我们只分析TCP listening socket的创建(UDP与TCP的创建采用统一的接口),函数入口为:
1 errno = 0;
2 if (settings.port && server_sockets(settings.port, tcp_transport,
3 portnumber_file)) {
4 vperror("failed to listen on TCP port %d", settings.port);
5 exit(EX_OSERR);
6 }
server_sockets函数即为创建TCP listening socket的入口函数。在server_sockets主要调用server_socket函数来实现,
1 /**
2 * Create a socket and bind it to a specific port number
3 * @param interface the interface to bind to
4 * @param port the port number to bind to
5 * @param transport the transport protocol (TCP / UDP)
6 * @param portnumber_file A filepointer to write the port numbers to
7 * when they are successfully added to the list of ports we
8 * listen on.
9 */
10 static int server_socket(const char *interface,
11 int port,
12 enum network_transport transport,
13 FILE *portnumber_file);
server_socket函数实现源码较长,以下只列出部分:
1 static int server_socket(const char *interface,
2 int port,
3 enum network_transport transport,
4 FILE *portnumber_file) {
5 int sfd;
6 struct linger ling = {0, 0};
7 struct addrinfo *ai;
8 struct addrinfo *next;
9 struct addrinfo hints = { .ai_flags = AI_PASSIVE,
10 .ai_family = AF_UNSPEC };
11 //套接字的创建过程
12 ..................
13 ..................
14 if (IS_UDP(transport)) {
15 int c;
16
17 for (c = 0; c < settings.num_threads_per_udp; c++) {
18 /* this is guaranteed to hit all threads because we round-robin */
19 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
20 UDP_READ_BUFFER_SIZE, transport);
21 }
22 } else {
23 if (!(listen_conn_add = conn_new(sfd, conn_listening,
24 EV_READ | EV_PERSIST, 1,
25 transport, main_base))) {
26 fprintf(stderr, "failed to create listening connection\n");
27 exit(EXIT_FAILURE);
28 }
29 listen_conn_add->next = listen_conn;
30 listen_conn = listen_conn_add;
31 }
32 }
33
34 freeaddrinfo(ai);
35
36 /* Return zero iff we detected no errors in starting up connections */
37 return success == 0;
38 }
在server_socket中,我们只关注两个函数:
(1)dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,UDP_READ_BUFFER_SIZE, transport);
当创建的是UDP套接字时,使用这个函数,由于UDP是无连接的,因此直接启动settings.num_threads_per_udp个线程来服务于UDP端口。
(2)listen_conn_add = conn_new(sfd, conn_listening,EV_READ | EV_PERSIST, 1,transport, main_base);
当创建的是TCP套接字时,调用conn_new函数,源码如下:
1 conn *conn_new(const int sfd, enum conn_states init_state,
2 const int event_flags,
3 const int read_buffer_size, enum network_transport transport,
4 struct event_base *base) {
5 conn *c = conn_from_freelist();
6
7 ........................//略去部分源码
8
9 event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
10 event_base_set(base, &c->event);
11 c->ev_flags = event_flags;
12
13 if (event_add(&c->event, 0) == -1) {
14 if (conn_add_to_freelist(c)) {
15 conn_free(c);
16 }
17 perror("event_add");
18 return NULL;
19 }
20
21 STATS_LOCK();
22 stats.curr_conns++;
23 stats.total_conns++;
24 STATS_UNLOCK();
25
26 MEMCACHED_CONN_ALLOCATE(c->sfd);
27
28 return c;
29 }
该函数对套接字设置conn_listening监听事件,回调函数为event_handler,在事件响应函数中调用状态机。
1 void event_handler(const int fd, const short which, void *arg) {
2 conn *c;
3
4 c = (conn *)arg;
5 assert(c != NULL);
6
7 c->which = which;
8
9 /* sanity */
10 if (fd != c->sfd) {
11 if (settings.verbose > 0)
12 fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
13 conn_close(c);
14 return;
15 }
16
17 drive_machine(c);
18
19 /* wait for next event */
20 return;
21 }
memcached中的状态机是memcached运转发动机,它根据链接的不同状态而采取不同的行为,状态枚举如下:
1 enum conn_states {
2 conn_listening, /**< the socket which listens for connections */
3 conn_new_cmd, /**< Prepare connection for next command */
4 conn_waiting, /**< waiting for a readable socket */
5 conn_read, /**< reading in a command line */
6 conn_parse_cmd, /**< try to parse a command from the input buffer */
7 conn_write, /**< writing out a simple response */
8 conn_nread, /**< reading in a fixed number of bytes */
9 conn_swallow, /**< swallowing unnecessary bytes w/o storing */
10 conn_closing, /**< closing this connection */
11 conn_mwrite, /**< writing out many items sequentially */
12 conn_max_state /**< Max state value (used for assertion) */
13 };
状态机的实现函数为drive_machine,由于该函数的源码实现过长,这里只分析对conn_listening状态的响应。
1 static void drive_machine(conn *c) {
2 bool stop = false;
3 int sfd, flags = 1;
4 socklen_t addrlen;
5 struct sockaddr_storage addr;
6 int nreqs = settings.reqs_per_event;
7 int res;
8
9 assert(c != NULL);
10
11 while (!stop) {
12
13 switch(c->state) {
14 //监听套接字发生事件
15 case conn_listening:
16 addrlen = sizeof(addr);
17 if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
18 if (errno == EAGAIN || errno == EWOULDBLOCK) {
19 /* these are transient, so don't log anything */
20 stop = true;
21 } else if (errno == EMFILE) {
22 if (settings.verbose > 0)
23 fprintf(stderr, "Too many open connections\n");
24 accept_new_conns(false);
25 stop = true;
26 } else {
27 perror("accept()");
28 stop = true;
29 }
30 break;
31 }
32 //新套接字设置非阻塞
33 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
34 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
35 perror("setting O_NONBLOCK");
36 close(sfd);
37 break;
38 }
39 //调度线程来处理连接
40 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
41 DATA_BUFFER_SIZE, tcp_transport);
42 stop = true;
43 break;
44 ...........................//略去其他状态的处理
45 }
从源码中我们可以看到,当监听套接字建立新连接时,通过事件响应函数event_handler来触发状态机,再调用dispatch_conn_new调度新线程来处理这个连接的读写事件。
1 /*
2 * Dispatches a new connection to another thread. This is only ever called
3 * from the main thread, either during initialization (for UDP) or because
4 * of an incoming connection.
5 */
6 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
7 int read_buffer_size, enum network_transport transport) {
8 CQ_ITEM *item = cqi_new();
9 int tid = (last_thread + 1) % settings.num_threads;
10
11 //以此种方式来取出线程
12 LIBEVENT_THREAD *thread = threads + tid;
13
14 last_thread = tid;
15
16 item->sfd = sfd;
17 item->init_state = init_state;
18 item->event_flags = event_flags;
19 item->read_buffer_size = read_buffer_size;
20 item->transport = transport;
21
22 //将新item放至threads的new_conn_queue队列中
23 cq_push(thread->new_conn_queue, item);
24
25 MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
26 //写一个字节启动新的线程
27 if (write(thread->notify_send_fd, "", 1) != 1) {
28 perror("Writing to thread notify pipe");
29 }
30 }
至此,memcached的线程池调度机制已分析完毕了。
|
|