设为首页 收藏本站
查看: 1022|回复: 0

[经验分享] memcached源码分析之线程池机制(一)

[复制链接]

尚未签到

发表于 2015-8-31 13:05:43 | 显示全部楼层 |阅读模式
  已经个把月没有写长篇博文了,最近抽了点时间,将memcached源码分析系列文章的线程机制篇给整出来,在分析源码的过程中参考了网上的一些资源。
  该文主要集中于两个问题:(1)memcached线程池是如何创建的,(2)线程池中的线程又是如何进行调度的。一切从源码中找答案。
  memcached的线程池模型采用较典型的Master-Worker模型:
  (1)主线程负责监听客户端的建立连接请求,以及accept 连接,将连接好的套接字放入连接队列;
  (2)调度workers空闲线程来负责处理已经建立好的连接的读写等事件。
  1 关键数据抽象
  (1)memcached单个线程结构的封装



1 //memcached线程结构的封装结构
2 typedef struct {
3     pthread_t thread_id;        /* unique ID of this thread */
4     struct event_base *base;    /* libevent handle this thread uses */
5     struct event notify_event;  /* listen event for notify pipe */
6     int notify_receive_fd;      /* receiving end of notify pipe */
7     int notify_send_fd;         /* sending end of notify pipe */
8     struct thread_stats stats;  /* Stats generated by this thread */
9     struct conn_queue *new_conn_queue; /* queue of new connections to handle */
10     cache_t *suffix_cache;      /* suffix cache */
11 } LIBEVENT_THREAD;
  这是memcached里的线程结构的封装,可以看到每个线程都包含一个CQ队列,一条通知管道pipe ,% m) z( Q4 O1 P+ d6 一个libevent的实例event_base等。
  (2)线程连接队列



1 /* A connection queue. */
2 typedef struct conn_queue CQ;
3 struct conn_queue {
4     CQ_ITEM *head;
5     CQ_ITEM *tail;
6     pthread_mutex_t lock;
7     pthread_cond_t  cond;
8 };
  每个线程结构体中都指向一个CQ链表,CQ链表管理CQ_ITEM的单向链表。
  (3)连接项结构体



1 /* An item in the connection queue. */
2 typedef struct conn_queue_item CQ_ITEM;
3 struct conn_queue_item {
4     int               sfd;
5     enum conn_states  init_state;
6     int               event_flags;
7     int               read_buffer_size;
8     enum network_transport     transport;
9     CQ_ITEM          *next;
10 };
  CQ_ITEM实际上是主线程accept后返回的已建立连接的fd的封装,由主线程创建初始化并放入连接链表CQ中,共workers线程使用。
  (4)网络连接的封装结构体



1 /**
2  * The structure representing a connection into memcached.
3  */
4  //memcached表示一个conn的抽象结构
5 typedef struct conn conn;
6 struct conn {
7 ..................   
8 };
  由于这个结构太大,就略去中间的成员不展示了,与我们线程池相关的有一个成员则非常关键,那就是state,它是memcached中状态机驱动的关键(由drive_machine函数实现)。
  2 线程池的初始化:
  main()中线程池初始化函数入口为:
  /* start up worker threads if MT mode */
  thread_init(settings.num_threads, main_base);
  函数的定义在thread.c实现,源码如下所示:



1 /*
2  * Initializes the thread subsystem, creating various worker threads.
3  *
4  * nthreads  Number of worker event handler threads to spawn
5  * main_base Event base for main thread
6  */
7 void thread_init(int nthreads, struct event_base *main_base) {
8     int         i;
9
10     pthread_mutex_init(&cache_lock, NULL);
11     pthread_mutex_init(&stats_lock, NULL);
12
13     pthread_mutex_init(&init_lock, NULL);
14     pthread_cond_init(&init_cond, NULL);
15
16     pthread_mutex_init(&cqi_freelist_lock, NULL);
17     cqi_freelist = NULL;
18
19     //分配线程池结构数组
20     threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
21     if (! threads) {
22         perror("Can't allocate thread descriptors");
23         exit(1);
24     }
25
26     dispatcher_thread.base = main_base;
27     dispatcher_thread.thread_id = pthread_self();
28
29     //为线程池每个线程创建读写管道
30     for (i = 0; i < nthreads; i++) {
31         int fds[2];
32         if (pipe(fds)) {
33             perror("Can't create notify pipe");
34             exit(1);
35         }
36
37         threads.notify_receive_fd = fds[0];
38         threads.notify_send_fd = fds[1];
39
40         //填充线程结构体信息
41         setup_thread(&threads);
42     }
43
44     /* Create threads after we've done all the libevent setup. */
45     for (i = 0; i < nthreads; i++) {
46         //为线程池创建数目为nthreads的线程,worker_libevent为线程的回调函数,
47         create_worker(worker_libevent, &threads);
48     }
49
50     /* Wait for all the threads to set themselves up before returning. */
51     pthread_mutex_lock(&init_lock);
52     while (init_count < nthreads) {
53         pthread_cond_wait(&init_cond, &init_lock);
54     }
55     pthread_mutex_unlock(&init_lock);
56 }
  线程池初始化函数由主线程进行调用,该函数先初始化各互斥锁,然后使用calloc分配nthreads*sizeof(LIBEVENT_THREAD)个字节的内存块来管理线程池,返回一个全局static变量 threads(类型为LIBEVENT_THREAD *);然后为每个线程创建一个匿名管道(该pipe将在线程的调度中发挥作用),接下来的setup_thread函数为线程设置事件监听,绑定CQ链表等初始化信息,源码如下所示:



1 /*
2  * Set up a thread's information.
3  */
4 static void setup_thread(LIBEVENT_THREAD *me) {
5     me->base = event_init();
6     if (! me->base) {
7         fprintf(stderr, "Can't allocate event base\n");
8         exit(1);
9     }
10
11     /* Listen for notifications from other threads */
12     //为管道设置读事件监听,thread_libevent_process为回调函数
13     event_set(&me->notify_event, me->notify_receive_fd,
14               EV_READ | EV_PERSIST, thread_libevent_process, me);
15     event_base_set(me->base, &me->notify_event);
16
17     if (event_add(&me->notify_event, 0) == -1) {
18         fprintf(stderr, "Can't monitor libevent notify pipe\n");
19         exit(1);
20     }
21
22     //为新线程创建连接CQ链表
23     me->new_conn_queue = malloc(sizeof(struct conn_queue));
24     if (me->new_conn_queue == NULL) {
25         perror("Failed to allocate memory for connection queue");
26         exit(EXIT_FAILURE);
27     }
28     //初始化线程控制器内的CQ链表
29     cq_init(me->new_conn_queue);
30
31     if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
32         perror("Failed to initialize mutex");
33         exit(EXIT_FAILURE);
34     }
35     //创建cache
36     me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
37                                     NULL, NULL);
38     if (me->suffix_cache == NULL) {
39         fprintf(stderr, "Failed to create suffix cache\n");
40         exit(EXIT_FAILURE);
41     }
42 }
  memcached使用libevent实现事件循环,关于libevent,不熟悉的读者可以查看相关资料,这里不做介绍,源码中的这句代码:
  event_set(&me->notify_event, me->notify_receive_fd,EV_READ | EV_PERSIST, thread_libevent_process, me);
  在me->notify_receive_fd(即匿名管道的读端)设置可读事件,回调函数 为thread_libevent_process,函数定义如下:



1 static void thread_libevent_process(int fd, short which, void *arg) {
2     LIBEVENT_THREAD *me = arg;
3     CQ_ITEM *item;
4     char buf[1];
5
6     //响应pipe可读事件,读取主线程向管道内写的1字节数据(见dispatch_conn_new()函数)
7     if (read(fd, buf, 1) != 1)
8         if (settings.verbose > 0)
9             fprintf(stderr, "Can't read from libevent pipe\n");
10
11     //从链接队列中取出一个conn
12     item = cq_pop(me->new_conn_queue);
13
14     if (NULL != item) {
15         //使用conn创建新的任务
16         conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
17                            item->read_buffer_size, item->transport, me->base);
18         if (c == NULL) {
19             if (IS_UDP(item->transport)) {
20                 fprintf(stderr, "Can't listen for events on UDP socket\n");
21                 exit(1);
22             } else {
23                 if (settings.verbose > 0) {
24                     fprintf(stderr, "Can't listen for events on fd %d\n",
25                         item->sfd);
26                 }
27                 close(item->sfd);
28             }
29         } else {
30             c->thread = me;
31         }
32         cqi_free(item);
33     }
34 }
  使用setup_thread设置线程结构体的初始化信息之后,现在我们回到thread_init函数,thread_init中接着循环调用(循环调用nthreads次)create_worker(worker_libevent, &threads); 创建真正运行的线程,create_worker是对pthread_create()简单的封装,参数worker_libevent作为每个线程的运行体,&threads为传入参数。
  worker_libevent为线程体,源码如下:



1 /*
2  * Worker thread: main event loop
3  */
4 static void *worker_libevent(void *arg) {
5     LIBEVENT_THREAD *me = arg;
6
7     /* Any per-thread setup can happen here; thread_init() will block until
8      * all threads have finished initializing.
9      */
10     pthread_mutex_lock(&init_lock);
11     init_count++;     //每创建新线程,将全局init_count加1
12     pthread_cond_signal(&init_cond);  // 发送init_cond信号
13     pthread_mutex_unlock(&init_lock);
14
15     //新创建线程阻塞于此,等待事件
16     event_base_loop(me->base, 0); //Libevent的事件主循环
17     return NULL;
18 }
  worker_libevent中给init_count加1的目的在thread_init函数的这段代码可以看出来,



1  /* Wait for all the threads to set themselves up before returning. */
2     pthread_mutex_lock(&init_lock);
3     while (init_count < nthreads) {
4         pthread_cond_wait(&init_cond, &init_lock);
5     }
6     pthread_mutex_unlock(&init_lock);
  即主线程阻塞如此,等待worker_libevent发出的init_cond信号,唤醒后检查init_count < nthreads是否为假(即创建的线程数目是否达到要求),否则继续等待。
  至此,线程池创建的代码已分析完毕,由于篇幅较长,将分析线程池中线程的调度流程另立一篇。
  
  
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-106830-1-1.html 上篇帖子: memcached源码分析之线程池机制(二) 下篇帖子: 原创:分享封装好的面向JAVA的memcached客户端操作类
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表