Memcached源码分析--线程模型(三)
原文:http://www.iteye.com/topic/344172最后看看memcached网络事件处理的最核心部分- drive_machine
需要铭记于心的是drive_machine是多线程环境执行的,主线程和workers都会执行drive_machine
[*]static void drive_machine(conn *c) {
[*] bool stop = false;
[*] int sfd, flags = 1;
[*] socklen_t addrlen;
[*] struct sockaddr_storage addr;
[*] int res;
[*]
[*] assert(c != NULL);
[*]
[*] while (!stop) {
[*]
[*] switch(c->state) {
[*] case conn_listening:
[*] addrlen = sizeof(addr);
[*] if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
[*] //省去n多错误情况处理
[*] break;
[*] }
[*] if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
[*] fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
[*] perror("setting O_NONBLOCK");
[*] close(sfd);
[*] break;
[*] }
[*] dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
[*] DATA_BUFFER_SIZE, false);
[*] break;
[*]
[*] case conn_read:
[*] if (try_read_command(c) != 0) {
[*] continue;
[*] }
[*] ....//省略
[*] }
[*] }
首先大家不到被while循环误导(大部分做java的同学都会马上联想到是个周而复始的loop)其实while通常满足一个case后就会break了,这里用while是考虑到垂直触发方式下,必须读到EWOULDBLOCK错误才可以
言归正传,drive_machine主要是通过当前连接的state来判断该进行何种处理,因为通过libevent注册了读写时间后回调的都是这个核心函数,所以实际上我们在注册libevent相应事件时,会同时把事件状态写到该conn结构体里,libevent进行回调时会把该conn结构作为参数传递过来,就是该方法的形参
memcached里连接的状态通过一个enum声明
[*]enum conn_states {
[*] conn_listening,/** the socket which listens for connections */
[*] conn_read, /** reading in a command line */
[*] conn_write, /** writing out a simple response */
[*] conn_nread, /** reading in a fixed number of bytes */
[*] conn_swallow, /** swallowing unnecessary bytes w/o storing */
[*] conn_closing, /** closing this connection */
[*] conn_mwrite, /** writing out many items sequentially */
[*]};
实际对于case conn_listening:这种情况是主线程自己处理的,workers线程永远不会执行此分支
我们看到主线程进行了accept后调用了
dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,DATA_BUFFER_SIZE, false);
这个函数就是通知workers线程的地方,看看
[*]void dispatch_conn_new(int sfd, int init_state, int event_flags,
[*] int read_buffer_size, int is_udp) {
[*] CQ_ITEM *item = cqi_new();
[*] int thread = (last_thread + 1) % settings.num_threads;
[*]
[*] last_thread = thread;
[*]
[*] item->sfd = sfd;
[*] item->init_state = init_state;
[*] item->event_flags = event_flags;
[*] item->read_buffer_size = read_buffer_size;
[*] item->is_udp = is_udp;
[*]
[*] cq_push(&threads.new_conn_queue, item);
[*]
[*] MEMCACHED_CONN_DISPATCH(sfd, threads.thread_id);
[*] if (write(threads.notify_send_fd, "", 1) != 1) {
[*] perror("Writing to thread notify pipe");
[*] }
[*]}
可以清楚的看到,主线程首先创建了一个新的CQ_ITEM,然后通过round robin策略选择了一个thread并通过cq_push将这个CQ_ITEM放入了该线程的CQ队列里,那么对应的workers线程是怎么知道的呢
就是通过这个write(threads.notify_send_fd, "", 1)向该线程管道写了1字节数据,则该线程的libevent立即回调了thread_libevent_process方法(上面已经描述过)
然后那个线程取出item,注册读时间,当该条连接上有数据时,最终也会回调drive_machine方法,也就是drive_machine方法的 case conn_read:等全部是workers处理的,主线程只处理conn_listening 建立连接这个
这部分代码确实比较多,没法全部贴出来,请大家参考源码,最新版本1.2.6,我省去了很多优化的地方
比如,每个CQ_ITEM被malloc时会一次malloc很多个,以减小碎片的产生等等细节。
Memcached源码分析--线程模型(一)
Memcached源码分析--线程模型(二)
页:
[1]