ny3259 发表于 2018-12-25 11:23:28

Memcached源码分析--线程模型(三)

  原文:http://www.iteye.com/topic/344172
  最后看看memcached网络事件处理的最核心部分- drive_machine
  
需要铭记于心的是drive_machine是多线程环境执行的,主线程和workers都会执行drive_machine
  


[*]static void drive_machine(conn *c) {
[*]    bool stop = false;
[*]    int sfd, flags = 1;
[*]    socklen_t addrlen;
[*]    struct sockaddr_storage addr;
[*]    int res;
[*]
[*]    assert(c != NULL);
[*]
[*]    while (!stop) {
[*]
[*]      switch(c->state) {
[*]      case conn_listening:
[*]            addrlen = sizeof(addr);
[*]            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
[*]                //省去n多错误情况处理
[*]                break;
[*]            }
[*]            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
[*]                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
[*]                perror(&quot;setting O_NONBLOCK&quot;);
[*]                close(sfd);
[*]                break;
[*]            }
[*]            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
[*]                                     DATA_BUFFER_SIZE, false);
[*]            break;
[*]
[*]      case conn_read:
[*]            if (try_read_command(c) != 0) {
[*]                continue;
[*]            }
[*]      ....//省略
[*]   }
[*] }
  

  首先大家不到被while循环误导(大部分做java的同学都会马上联想到是个周而复始的loop)其实while通常满足一个case后就会break了,这里用while是考虑到垂直触发方式下,必须读到EWOULDBLOCK错误才可以
  

  
言归正传,drive_machine主要是通过当前连接的state来判断该进行何种处理,因为通过libevent注册了读写时间后回调的都是这个核心函数,所以实际上我们在注册libevent相应事件时,会同时把事件状态写到该conn结构体里,libevent进行回调时会把该conn结构作为参数传递过来,就是该方法的形参
  

  
memcached里连接的状态通过一个enum声明
  


[*]enum conn_states {
[*]    conn_listening,/** the socket which listens for connections */
[*]    conn_read,       /** reading in a command line */
[*]    conn_write,      /** writing out a simple response */
[*]    conn_nread,      /** reading in a fixed number of bytes */
[*]    conn_swallow,    /** swallowing unnecessary bytes w/o storing */
[*]    conn_closing,    /** closing this connection */
[*]    conn_mwrite,   /** writing out many items sequentially */
[*]};
  

  实际对于case conn_listening:这种情况是主线程自己处理的,workers线程永远不会执行此分支
  
我们看到主线程进行了accept后调用了
  
dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,DATA_BUFFER_SIZE, false);
  

  
这个函数就是通知workers线程的地方,看看
  


[*]void dispatch_conn_new(int sfd, int init_state, int event_flags,
[*]                     int read_buffer_size, int is_udp) {
[*]    CQ_ITEM *item = cqi_new();
[*]    int thread = (last_thread + 1) % settings.num_threads;
[*]
[*]    last_thread = thread;
[*]
[*]    item->sfd = sfd;
[*]    item->init_state = init_state;
[*]    item->event_flags = event_flags;
[*]    item->read_buffer_size = read_buffer_size;
[*]    item->is_udp = is_udp;
[*]
[*]    cq_push(&threads.new_conn_queue, item);
[*]
[*]    MEMCACHED_CONN_DISPATCH(sfd, threads.thread_id);
[*]    if (write(threads.notify_send_fd, &quot;&quot;, 1) != 1) {
[*]      perror(&quot;Writing to thread notify pipe&quot;);
[*]    }
[*]}
  

  可以清楚的看到,主线程首先创建了一个新的CQ_ITEM,然后通过round robin策略选择了一个thread并通过cq_push将这个CQ_ITEM放入了该线程的CQ队列里,那么对应的workers线程是怎么知道的呢
  

  
就是通过这个write(threads.notify_send_fd, &quot;&quot;, 1)向该线程管道写了1字节数据,则该线程的libevent立即回调了thread_libevent_process方法(上面已经描述过)
  

  
然后那个线程取出item,注册读时间,当该条连接上有数据时,最终也会回调drive_machine方法,也就是drive_machine方法的 case conn_read:等全部是workers处理的,主线程只处理conn_listening 建立连接这个
  

  
这部分代码确实比较多,没法全部贴出来,请大家参考源码,最新版本1.2.6,我省去了很多优化的地方
  
比如,每个CQ_ITEM被malloc时会一次malloc很多个,以减小碎片的产生等等细节。
  Memcached源码分析--线程模型(一)
  Memcached源码分析--线程模型(二)


页: [1]
查看完整版本: Memcached源码分析--线程模型(三)