loin 发表于 2015-9-1 07:40:47

Memcached源码拆分:Libevent_Thread

  个人笔记,不做详细介绍

  最近看完Memcached的源码后总是忍不住再看一遍,虽然有些地方写得有点不是那么华丽,但是总感觉想从Memcached里面挖出一些东西来,所以就再看了其他人的关于Memached的源码的一些分析,加上自己对Memcached的理解,目前觉得Memcached的源码里有下面几部分可以好好学学:
  (1)SLAB内存分配策略
  (2)线程模型
  (3)Memcached通信协议
  (4)Consistent Hashing
  (5)LRU策略
  (6)UNIX domain,UDP,TCP客户端连接
  (7)Libevent
  (8)Memcached的分布式
  今天先从线程模型开始吧,简单说说然后直接上代码(原谅我现在越来越懒了)。
  主要过程:
  1 主进程server_socket负责接受外部发起的连接,并且将这个建立后的连接交给workerThread处理,选择workerThread的方式是轮询。
  2 workerThread负责“接受外部连接”,并且将已经建立的连接封装成conn对象,然后委托给一个event_handler函数做后续处理。
  3 主线程和workerThread之间的通信是通过pipe来完成,workerThread在rev_fd上监听事件,而主线程则向fd写入一个字节表示新连接到来,同时也push一个item到全局的queue中。
  4 workerThread监听到这个事件后就从queue中拿出item封装成conn对象,其中就包含了外部连接的一些属性,这时候workerThread可以独立跟外部连接通信。
  我把Memcached中关于线程模型的代码拆剪了出来,变成一个简单可执行的程序。主要包括以下几个文件
  1 main.cpp
  初始化WorkerThread(实际上每个WorkerThread负责一个外部连接),启动主线程server connect事件监听(这里我不用socket通信,直接监听命令行),一旦命令行有命令输入,就会激发事件,然后主线程就会以轮询的方式从空闲线程中选出一条来进行后续操作,并通过pipe通知该workerThread有新item可以处理。


main.cpp


1 #include<unistd.h> //getopt
2 #include<stdlib.h> //atoi
3 #include<event.h> //libevent
4 #include<stdio.h>
5 #include<pthread.h>
6 #include<string.h> //strerror
7 #include<errno.h>
8 #include<fcntl.h> //open
9 #include"WorkerThreads.h"
10
11 //Globally Settings
12 struct Settings
13 {
14   int num_threads;   
15 };
16 struct Settings settings;
17
18 //Globally Values
19 static struct event_base *main_base;                  //main thread dispatch event_base
20 static int last_thread=-1;
21 #define DATA_BUFFER_SIZE 2048
22 WorkerThreads *workerThreads;
23
24 //Globally Functions
25 void readCommand(Settings settings){}
26
27 void thread_init(int nthreads)                  //void thread_init(int nthreads,struct event_base *main_base)
28 {   
29   workerThreads=new WorkerThreads(nthreads);
30
31   workerThreads->initiate();
32 }
33
34 void dispatch_conn_new(int sfd,int wrfd)
35 {
36   
37   CQ_ITEM *item=cqi_new();                           
38   int tid=(last_thread+1)%settings.num_threads;            //轮询选出workerThread(数组)
39   
40   LIBEVENT_THREAD *thread=workerThreads->threads+tid;
41   
42   last_thread=tid;
43
44   item->sfd=sfd;                                          //封装必要的信息到item结构,后面会利用item封装为conn
45   item->init_state=conn_read;
46   item->event_flags=EV_READ | EV_PERSIST;
47 //    item->read_buffer_size=DATA_BUFFER_SIZE;
48   item->read_buffer_size=wrfd;
49   item->transport=tcp_transport;
50   
51   cq_push(thread->new_conn_queue,item);                  //item需要插入到被选中的thread的全局queue里面
52   
53   printf("item fd is:%d\n",item->sfd);
54   int wc=write(thread->notify_send_fd,"",1);                //主线程和workerThread的通信方式,写入到notify_send_fd告诉辅助线程item准备好了,可以处理
55 }
56 void base_event_handler(const int fd, const short which,void* arg)
57 {
58   char buf;
59   int rc=read(fd,buf,sizeof(buf));
60   buf='\0';
61   printf("%s\n",buf);
62   //int testfd=open("./testfile.txt",O_RDWR);
63   int pipefds;
64   pipe(pipefds);
65   dispatch_conn_new(pipefds,pipefds);                        //这里使用pipe来模拟memcached中外部连接后带来的通信fd,注意要想在libevent上监听fd,则不能使用regular file的fd,libevent不支持
66 }
67 int main(int argc, char **argv)
68 {
69 //    readCommand(settings);                        //set threads counts
70   int c;
71   while(-1!=(c=getopt(argc,argv,"t:")))      //从命令行读取workerThread数
72   {
73         switch (c)
74         {
75         case 't':
76             settings.num_threads=atoi(optarg);
77             break;
78         }
79   }
80   
81   main_base=event_init();                        //主线程的监听外部连接(这里监听标准输入),memcached则借助自定义的协议,接收来自Unix域/tcp/udp的客户端请求。
82   struct event inputEvent;
83   event_set(&inputEvent,STDIN_FILENO,EV_READ | EV_PERSIST,base_event_handler,NULL);    //一旦有事件触发,则委托给base_event_handler处理
84   event_base_set(main_base,&inputEvent);
85   event_add(&inputEvent,0);
86   
87
88   conn_init();
89
90   thread_init(settings.num_threads);                //WorkerThread初始化,详细见workerThreads->initiate();
91
92   event_base_loop(main_base,0);                  //开始监听主线程的注册事件
93 }
  2 cq.cpp
  上面提到的CQ_ITEM和workerThread内的conn_queue的相关操作是由cq.cpp这个工具文件来完成的。我们知道每个workerThread内部都会有一个conn_queue用来做连接消息的缓冲队列,所以就需要有工具来进行初始化队列,插入,弹出。


cq.cpp(part 1)


1 void cq_init(conn_queue *cq)
2 {
3   pthread_mutex_init(&cq->lock,NULL);
4   pthread_cond_init(&cq->cond,NULL);
5   cq->head=NULL;
6   cq->tail=NULL;
7 }
8 CQ_ITEM *cq_pop(conn_queue *cq)
9 {
10   CQ_ITEM *item;
11   pthread_mutex_lock(&cq->lock);
12   item=cq->head;
13   if(NULL!=item)
14   {
15         cq->head=item->next;
16         if(NULL==cq->head)
17             cq->tail=NULL;
18   }
19   pthread_mutex_unlock(&cq->lock);
20   return item;
21 }
22 void cq_push(conn_queue *cq,CQ_ITEM *item)
23 {
24 /*
25   printf("be\n");
26   
27   if(cq==NULL)
28         printf("cq null\n");
29   if(item==NULL)
30         printf("item null\n");
31 */
32   item->next=NULL;
33   pthread_mutex_lock(&cq->lock);
34   if(NULL==cq->tail)
35   {
36         cq->head=item;
37   }
38   else
39         cq->tail->next=item;
40   cq->tail=item;
41   pthread_cond_signal(&cq->cond);
42   pthread_mutex_unlock(&cq->lock);
43
44 }
  而CQ_ITEM* item=cqi_new(),这里使用cqi_new也是避免每次请求ITEM时都分配一个item,在cqi_new中会分配多个item空间,然后每次请求item空间时则从其中取出空闲item,这里用很巧妙的数组结构来代替指针链表结构,前面说到cqi_new会分配一块空间,它的大小是item的大小的倍数,同时每个item都会有一个next域来记录下一个item在这块空间中的地址,全局的cqi_freelist指向的就是空闲item组成的list,每次free item后就可以重新组织空闲list,每次new的时候则从freelist中取出空闲item,如果freelist没有空闲item,则会再分配一块空间,虽然cqi_freelist会指向新的内存空间,但是同时free item操作还是会将所有空闲空间收集起来。


cq.cpp(part 2)


1 static pthread_mutex_t cqi_freelist_lock=PTHREAD_MUTEX_INITIALIZER;
2 static CQ_ITEM *cqi_freelist;
3 void cqi_free(CQ_ITEM *item)
4 {
5   pthread_mutex_lock(&cqi_freelist_lock);
6   item->next=cqi_freelist;
7   cqi_freelist=item;
8   pthread_mutex_unlock(&cqi_freelist_lock);
9 }
10
11 CQ_ITEM *cqi_new(void)
12 {
13   CQ_ITEM *item=NULL;
14   pthread_mutex_lock(&cqi_freelist_lock);
15   if(cqi_freelist)
16   {
17         item=cqi_freelist;
18         cqi_freelist=item->next;
19   }
20   pthread_mutex_unlock(&cqi_freelist_lock);
21
22   if(NULL==item)
23   {
24         int i;
25         item =(CQ_ITEM*) malloc(sizeof(CQ_ITEM)*ITEMS_PER_ALLOC);
26         if(NULL ==item)
27             return NULL;
28
29         for(i=2;i<ITEMS_PER_ALLOC;i++)
30             item.next=&item;
31   
32         pthread_mutex_lock(&cqi_freelist_lock);
33         item.next=cqi_freelist;
34         cqi_freelist=&item;
35         pthread_mutex_unlock(&cqi_freelist_lock);
36   }
37   return item;
38 }
  3 WorkerThread.cpp
  在main函数中执行了workerThread的initiate后,会完成LIBEVENT_THREAD(memcached将thread和libevent很好的封装在一起)数组的分配,然后设定每个thread和主线程的pipe通信,接下来对这些Libevent_thread进行设置后就可以启动线程。这里只说一下“设置”这一步。
  void WorkerThreads::setup_event_thread(LIBEVENT_THREAD *me)
  该函数就完成了libevent_thread中的libevent和thread的配合的设置:pipe事件监听并委托给thread_libevent_process处理,conn_queue初始化等。其中
thread_libevent_process则将item封装为conn,memcached中分布式客户端代理和服务器的通信就是依赖这样一个过程,最后有conn_new完成委托处理


WorkerThreads.cpp


1 /*
2 #include"libevent_thread.h"
3 #include<event.h>
4 #include<pthread.h>
5 #include"conn.h"
6 #include"cq.h"
7 #include<unistd.h>
8 #include<string.h>
9 */
10 #include"WorkerThreads.h"
11 #include<stdio.h>
12
13 /*
14   WorkerThreads::WorkerThreads(int threadCount=1):nthreads(threadCount),threads(NULL)
15   {
16         init_count=0;
17         pthread_mutex_init(&init_lock,NULL);
18         pthread_cond_init(&init_cond,NULL);
19   }
20 */
21
22   void WorkerThreads::initiate()
23   {
24         int i;
25
26         threads=(LIBEVENT_THREAD*)calloc(nthreads,sizeof(LIBEVENT_THREAD));      //LIBEVENT_THREAD,加入libevent元素的thread结构 “数组”
27         if(!threads)
28         {
29             perror("can't allocate thread des");
30             exit(1);
31         }
32
33         for(i=0;i<nthreads;i++)                                                    //设置thread和thread中的libevent所需属性
34         {
35             int fds;
36             if(pipe(fds))                                                      //thread和主线程的通信pipe
37             {
38               perror("can't create notify pipe");
39               exit(1);
40             }
41
42             threads.notify_receive_fd=fds;
43             threads.notify_send_fd=fds;
44
45             setup_event_thread(&threads);                                    //设置thread和thread中的libevent所需属性
46         }
47
48         for(i=0;i<nthreads;i++)
49         {
50             create_worker(worker_libevent,&threads);                            //启动thread
51         }
52
53         pthread_mutex_lock(&init_lock);
54         while( init_count < nthreads)
55         {
56             pthread_cond_wait(&init_cond,&init_lock);
57         }
58         pthread_mutex_unlock(&init_lock);
59
60         printf("finish\n");
61   }
62
63   void WorkerThreads::setup_event_thread(LIBEVENT_THREAD *me)
64   {
65         me->base=event_init();//every thread has its own event_base
66         
67 //2.0 has event_config
68         /*
69         struct event_config *cfg=event_config_new();
70         event_config_avoid_method(cfg,"epoll");
71         me->base=event_base_new_with_config(cfg);
72         event_config_free(cfg);
73 */
74         //in order to use libevent on file,use this method
75
76         if(!me->base)
77         {
78             fprintf(stderr,"can't allocate event base\n");
79             exit(1);
80         }
81
82         event_set(&me->notify_event,me->notify_receive_fd,                        //设置 监听事件 和 处理函数
83             EV_READ|EV_PERSIST, thread_libevent_process,me);   
84         event_base_set(me->base,&me->notify_event);
85
86         if(event_add(&me->notify_event,0)==-1)
87         {
88             fprintf(stderr,"can't monitor libevent notify pipe\n");
89             exit(1);
90         }
91
92         //why initiate conn_queue here?
93         me->new_conn_queue=(conn_queue*)malloc(sizeof(struct conn_queue));      //内部的conn_queue
94         if(me->new_conn_queue==NULL)
95         {
96             perror("Failed to allocate memory for connection queue");
97             exit(EXIT_FAILURE);
98         }
99
100         cq_init(me->new_conn_queue);
101   }
102   
103   void thread_libevent_process(int fd,short which,void *arg)                  //处理函数,即当主线程通知workerThread时,主线程会插入一个item到某个thread的queue中,queue是一个工具类
104 //workerThread将item pop出来并封装为conn,封装期间就建立了和item所指向的对象的联系,也使用libevent完成
105   {
106         LIBEVENT_THREAD *me=(LIBEVENT_THREAD*)arg;
107         CQ_ITEM *item;
108
109         char buf;
110
111         if(read(fd,buf,1)!=1)
112             fprintf(stderr,"can't read from libevent pipe\n");
113
114         item=cq_pop(me->new_conn_queue);
115         printf("item fd is:%d\n",item->sfd);
116         if(NULL!=item)
117         {
118             conn *c= conn_new (item->sfd,item->init_state,item->event_flags,
119               item->read_buffer_size,item->transport,me->base);
120   
121             if(NULL==c)
122             {
123               if( IS_UDP(item->transport))
124               {
125                     fprintf(stderr,"can't listen for events on UDP\n");
126                     exit(1);
127               }
128               else
129               {
130                     fprintf(stderr,"can't listen for events on fd %d\n",item->sfd);
131                     close(item->sfd);
132               }
133             }
134             else
135             {
136               c->thread=me;
137               //test
138               write(item->read_buffer_size,"a",1);
139             }
140             cqi_free(item);
141         }
142
143   }
144   
145   void WorkerThreads::create_worker(void *(*func)(void*),void *arg)
146   {
147         pthread_t thread;
148         pthread_attr_t attr;
149         int ret;
150
151         pthread_attr_init(&attr);
152
153         if((ret=pthread_create(&thread,&attr,func,arg))!=0)               
154         {
155             fprintf(stderr,"can't create thread: %s\n",strerror(ret));
156             exit(1);
157         }
158   }
159   
160   void* worker_libevent(void *arg)
161   {
162         LIBEVENT_THREAD *me = (LIBEVENT_THREAD*)arg;
163         pthread_mutex_lock(&init_lock);
164         init_count++;
165         pthread_cond_signal(&init_cond);
166         pthread_mutex_unlock(&init_lock);
167
168         event_base_loop(me->base,0);
169         return NULL;
170   }
171
172   
173
174   

conn.cpp


1 //#include<pthread.h>
2 //#include<event.h>
3 #include"conn.h"
4
5 #include<unistd.h>
6
7 static pthread_mutex_t conn_lock=PTHREAD_MUTEX_INITIALIZER;
8 static int freecurr;
9 static int freetotal;
10
11
12
13 static conn **freeconns;
14
15 //static                  //why can not use static
16 void conn_init(void) {                                                      //每个Worker的
17   freetotal = 200;
18   freecurr = 0;
19   if ((freeconns =(conn**)calloc(freetotal, sizeof(conn *))) == NULL) {
20         fprintf(stderr, "Failed to allocate connection structures\n");
21   }
22   return;
23 }
24
25 conn *conn_new(const int sfd,enum conn_states init_state,const int event_flags,
26                const int read_buffer_size,enum network_transport transport,
27 struct event_base *base)
28 {
29   conn *c=conn_from_freelist();
30   if(NULL==c)
31   {
32         if(!(c=(conn*)calloc(1,sizeof(conn))))
33         {
34             fprintf(stderr,"calloc()\n");
35             return NULL;
36         }
37         //initiate c
38 /*
39 {
40   MEMCACHED_CONN_CREATE(c);
41
42         c->rbuf = c->wbuf = 0;
43         c->ilist = 0;
44         c->suffixlist = 0;
45         c->iov = 0;
46         c->msglist = 0;
47         c->hdrbuf = 0;
48
49         c->rsize = read_buffer_size;
50         c->wsize = DATA_BUFFER_SIZE;
51         c->isize = ITEM_LIST_INITIAL;
52         c->suffixsize = SUFFIX_LIST_INITIAL;
53         c->iovsize = IOV_LIST_INITIAL;
54         c->msgsize = MSG_LIST_INITIAL;
55         c->hdrsize = 0;
56
57         c->rbuf = (char *)malloc((size_t)c->rsize);
58         c->wbuf = (char *)malloc((size_t)c->wsize);
59         c->ilist = (item **)malloc(sizeof(item *) * c->isize);
60         c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
61         c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
62         c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
63
64         //根据配置大小来分配,可能会分配失败
65         if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
66               c->msglist == 0 || c->suffixlist == 0) {
67             conn_free(c);
68             fprintf(stderr, "malloc()\n");
69             return NULL;
70         }
71         //end initiate c
72 }   
73 */
74   }
75   //initiate c
76 /*
77 {
78   c->transport = transport;               //传输方式
79   c->protocol = settings.binding_protocol;//传输协议
80
81   // unix socket mode doesn't need this, so zeroed out.but why
82   //is this done for every command?presumably for UDP
83   //mode.
84   if (!settings.socketpath) {
85         c->request_addr_size = sizeof(c->request_addr);
86   } else {
87         c->request_addr_size = 0;
88   }
89
90   if (settings.verbose > 1) {
91         if (init_state == conn_listening) {
92             fprintf(stderr, "<%d server listening (%s)\n", sfd,
93               prot_text(c->protocol));
94         } else if (IS_UDP(transport)) {
95             fprintf(stderr, "<%d server listening (udp)\n", sfd);
96         } else if (c->protocol == negotiating_prot) {
97             fprintf(stderr, "<%d new auto-negotiating client connection\n",
98                     sfd);
99         } else if (c->protocol == ascii_prot) {
100             fprintf(stderr, "<%d new ascii client connection.\n", sfd);
101         } else if (c->protocol == binary_prot) {
102             fprintf(stderr, "<%d new binary client connection.\n", sfd);
103         } else {
104             fprintf(stderr, "<%d new unknown (%d) client connection\n",
105               sfd, c->protocol);
106             assert(false);
107         }
108   }
109
110 //信息赋值给conn结构,conn的构造把item全用上了
111   c->sfd = sfd;
112   c->state = init_state;
113   c->rlbytes = 0;
114   c->cmd = -1;
115   c->rbytes = c->wbytes = 0;
116   c->wcurr = c->wbuf;
117   c->rcurr = c->rbuf;
118   c->ritem = 0;
119   c->icurr = c->ilist;
120   c->suffixcurr = c->suffixlist;
121   c->ileft = 0;
122   c->suffixleft = 0;
123   c->iovused = 0;
124   c->msgcurr = 0;
125   c->msgused = 0;
126
127   c->write_and_go = init_state;
128   c->write_and_free = 0;
129   c->item = 0;
130
131   c->noreply = false;
132 }
133 */
134   //end initiate c
135   
136   c->sfd=sfd;
137   printf("test fd is:%d\n",sfd);
138   event_set(&c->event,sfd,event_flags,event_handler,(void*)c);
139   event_base_set(base,&c->event);
140   c->ev_flags=event_flags;
141
142   if(event_add(&c->event,0)==-1)
143   {
144         if( conn_add_to_freelist(c))
145             conn_free(c);
146         perror("event_add");
147         return NULL;
148   }
149
150   //MEMCACHED_CONN_ALLOCATE(c->sfd);
151   
152   return c;
153 }
154
155 conn *conn_from_freelist()
156 {
157   conn *c;
158   pthread_mutex_lock(&conn_lock);
159
160   if(freecurr>0)
161         c=freeconns[--freecurr];
162   else
163         c=NULL;
164
165   pthread_mutex_unlock(&conn_lock);
166 }
167
168 void event_handler(const int fd,const short which,void *arg)
169 {
170   //real handler
171   char buf;
172   int rc=read(fd,buf,sizeof(buf));
173   buf='\0';
174   printf("%s\n",buf);
175 }
176
177 bool conn_add_to_freelist(conn *c)
178 {
179   bool ret = true;
180   pthread_mutex_lock(&conn_lock);
181
182   if (freecurr < freetotal)
183   {
184         freeconns = c;
185         ret = false;
186   }
187   else
188   {
189         // try to enlarge free connections array
190         size_t newsize = freetotal * 2;
191         conn **new_freeconns = (conn**)realloc(freeconns, sizeof(conn *) * newsize);
192         if (new_freeconns)
193         {               
194             freetotal = newsize;
195             freeconns = new_freeconns;
196             freeconns = c;      
197             ret = false;
198         }
199   }
200
201   pthread_mutex_unlock(&conn_lock);
202   return ret;
203 }
204
205 void conn_free(conn *c) {
206   if (c)
207   {
208         /*
209         MEMCACHED_CONN_DESTROY(c);
210         if (c->hdrbuf)            
211         free(c->hdrbuf);
212         if (c->msglist)
213         free(c->msglist);      
214         if (c->rbuf)      
215         free(c->rbuf);      
216         if (c->wbuf)      
217         free(c->wbuf);            
218         if (c->ilist)      
219         free(c->ilist);      
220         if (c->suffixlist)      
221         free(c->suffixlist);      
222         if (c->iov)
223         free(c->iov);   
224 */
225         free(c);   
226   }
227 }
  源码:memcached的线程模型.rar
页: [1]
查看完整版本: Memcached源码拆分:Libevent_Thread