|
从main函数开始,位于memcached.c
1 int main (int argc, char **argv) {
2 //.......................
3 //.......................
4 //.......................
5
6
7 /* handle SIGINT 注册信号处理函数,目前sig_handler是空函数*/
8 signal(SIGINT, sig_handler);
9
10 settings_init();
11 /* init settings
12 初始化默认设置,其中
13 settings.port = 11211; 默认端口
14 settings.maxbytes = 64 * 1024 * 1024; 默认使用64MB内存
15 settings.num_threads = 4; 默认启动4个工作线程
16 settings.item_size_max = 1024 * 1024; 默认每对 key value的value最大1MB
17 */
18
19
20 //.......................
21 //.......................
22 //.......................
23
24
25 /* 初始化主线程的libevent实例 */
26 main_base = event_init();
27
28
29 //.......................
30 //.......................
31 //.......................
32
33
34 /* 启动工作线程 */
35 thread_init(settings.num_threads, main_base);
36 /*
37 // 主线程是分配线程,分配工作给工作线程
38 dispatcher_thread.base = main_base;
39 dispatcher_thread.thread_id = pthread_self();
40 //设置工作线程的属性以及它们各自的libevent实例初始化
41 for (i = 0; i < nthreads; i++) {
42 int fds[2];
43 pipe(fds)
44 threads.notify_receive_fd = fds[0];
45 threads.notify_send_fd = fds[1];
46 setup_thread(&threads);
47 }
48 //启动线程,线程处理函数为worker_libevent, 每个线程有各自的event_base_loop
49 for (i = 0; i < nthreads; i++) {
50 create_worker(worker_libevent, &threads);
51 }
52 */
53
54
55 //.......................
56 //.......................
57 //.......................
58
59
60 /* 创建服务端的socket */
61 server_sockets(settings.port, tcp_transport, portnumber_file))
62
63 //.......................
64 //.......................
65 //.......................
66
67
68 /* 主线程的libevent消息循环 */
69 if (event_base_loop(main_base, 0) != 0) {
70 retval = EXIT_FAILURE;
71 }
72
73
74 //.......................
75 //.......................
76 //.......................
77
78
79 return retval;
80 }
主线程
1 server_sockets(settings.port, tcp_transport, portnumber_file)
2 --->
3 server_socket(settings.inter, port, transport, portnumber_file);
4 --->
5 listen(sfd, settings.backlog)
6 listen_conn_add = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, transport, main_base)
7 --->
8 //设置sfd的消息响应函数event_handler
9 event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
10 event_base_set(base, &c->event);
11 --->
12 void event_handler(const int fd, const short which, void *arg) {
13 conn *c;
14 c = (conn *)arg;
15
16 //.......................
17 //.......................
18 //.......................
19
20 drive_machine(c);
21
22 return;
23 }
24 --->
25 //drive_machine
26 sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)
27 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
28 DATA_BUFFER_SIZE, tcp_transport);
29
30 --->
31 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
32 int read_buffer_size, enum network_transport transport) {
33 char buf[1];
34 int tid = (last_thread + 1) % settings.num_threads;
35
36 LIBEVENT_THREAD *thread = threads + tid;
37
38 //往其中一个worker线程的管道写,分配这个链接给worker
39 buf[0] = 'c';
40 if (write(thread->notify_send_fd, buf, 1) != 1) {
41 perror("Writing to thread notify pipe");
42 }
43 }
函数补充:
1 static void setup_thread(LIBEVENT_THREAD *me) {
2 me->base = event_init();
3 if (! me->base) {
4 fprintf(stderr, "Can't allocate event base\n");
5 exit(1);
6 }
7
8 // 把管道的读事件注册到libevent对象中
9 // 分配线程会往工作线程的 notify_send_fd 写数据,然后触发工作线程的 notify_receive_fd 的读事件
10 event_set(&me->notify_event, me->notify_receive_fd,
11 EV_READ | EV_PERSIST, thread_libevent_process, me);
12 event_base_set(me->base, &me->notify_event);
13
14 if (event_add(&me->notify_event, 0) == -1) {
15 fprintf(stderr, "Can't monitor libevent notify pipe\n");
16 exit(1);
17 }
18
19 //.......................
20 //.......................
21 //.......................
22 }
工作线程
thread_libevent_process -> conn_new -> event_set(&c->event, sfd, event_flags, event_handler, (void *)c);event_base_set(base, &c->event);
->event_handler -> drive_machine
工作线程的消息libevent事件处理就由drive_machine负责
尝试去理解它的工作流程
drive_machine
-->
case conn_read:
case READ_DATA_RECEIVED:
conn_set_state(c, conn_parse_cmd);
-->
case conn_parse_cmd :
if (try_read_command(c) == 0) {
-->
static int try_read_command(conn *c) {
//..............
if (c->protocol == binary_prot) {
}
else {
//............
process_command(c, c->rcurr);
}
-->
static void process_command(conn *c, char *command) {
//假设是set
} else if ((ntokens == 6 || ntokens == 7) &&
((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
process_update_command(c, tokens, ntokens, comm, false);
-->
process_update_command(c, tokens, ntokens, comm, false);
{
//...........
//其实我不懂为什么要分配一个出来
it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
//................
conn_set_state(c, conn_nread);
}
-->
//drive_machine
case conn_nread:
if (c->rlbytes == 0) {
complete_nread(c);
-->
complete_nread_ascii(c);
{
ret = store_item(it, comm, c);
}
不知道理解得对不对
-----------------------------------------------------------------------------------------------------------------
用户连接memcached成功后,主要逻辑在drive_machine这个函数
我尝试打日志,理解它的流程

A:表示memcached B:表示用户
A: ./memcached -m 512 -p 14444 -vv
B: telnet 127.0.0.1 14444
A: ############################## state = conn_listening
B: add id 1 0 4
A:
############################## state = conn_new_cmd
############################## state = conn_waiting
############################## state = conn_read
############################## state = conn_parse_cmd
32: Client using the ascii protocol
<32 add id 1 0 4
############################## state = conn_nread
B: 1234
A:
############################## state = conn_nread
############################## state = conn_nread
>32 STORED
############################## state = conn_write
############################## state = conn_mwrite
############################## state = conn_write
############################## state = conn_mwrite
############################## state = conn_new_cmd
############################## state = conn_waiting
B: 收到
STORED
B: get id
A:
############################## state = conn_read
############################## state = conn_parse_cmd
<32 get id
>32 sending key id
>32 END
############################## state = conn_mwrite
############################## state = conn_mwrite
############################## state = conn_new_cmd
############################## state = conn_waiting
B: 收到
VALUE id 1 4
1234
END
B: quit
A:
############################## state = conn_read
############################## state = conn_parse_cmd
<32 quit
############################## state = conn_closing
<32 connection closed.
|
|