非法入侵 发表于 2015-7-20 11:35:23

redis源码笔记-ae.c

  ae.c是redis事件框架的具体实现,这篇blog对这份源码进行简单说明。其中谈到了作者已经标记的一些未来可能做的改进。
  ae.c



1 #include
2 #include
3 #include
4 #include
5 #include
6
7 #include "ae.h"
8 #include "zmalloc.h"
9 #include "config.h"
10
11 /* Include the best multiplexing layer supported by this system.
12* The following should be ordered by performances, descending. */
    //为了支持不同的平台,redis用相同的接口封装了系统提供的多路复用层代码。接口共提供如下函数:aeApiCreate\aeApiFree\aeApiAddEvent\aeApiDelEvent\aeApiPoll\aeApiName函数,以及一个struct aeApiState
    //通过包含不同的头文件,选择不同的底层实现
    //按如下顺序,效率递减: epoll > kqueue > select
13 #ifdef HAVE_EPOLL
14 #include "ae_epoll.c"
15 #else
16   #ifdef HAVE_KQUEUE
17   #include "ae_kqueue.c"
18   #else
19   #include "ae_select.c"
20   #endif
21 #endif
22
    //初始化函数,创建事件循环,函数内部alloc一个结构,用于表示事件状态,供后续其他函数作为参数使用
23 aeEventLoop *aeCreateEventLoop(void) {
24   aeEventLoop *eventLoop;
25   int i;
26
27   eventLoop = zmalloc(sizeof(*eventLoop));
28   if (!eventLoop) return NULL;
      //时间event用链表存储
29   eventLoop->timeEventHead = NULL;
30   eventLoop->timeEventNextId = 0;
      //表示是否停止事件循环
31   eventLoop->stop = 0;
      //maxfd只由ae_select.c使用,后续有些相关的处理,如果使用epoll的话,其实可以进行简化
32   eventLoop->maxfd = -1;
      //每次调用epoll\select前调用的函数,由框架使用者注册
33   eventLoop->beforesleep = NULL;
34   if (aeApiCreate(eventLoop) == -1) {
35         zfree(eventLoop);
36         return NULL;
37   }
38   /* Events with mask == AE_NONE are not set. So let's initialize the
39      * vector with it. */
      //将所有的文件描述符的mask设置为无效值,作为初始化
40   for (i = 0; i < AE_SETSIZE; i++)
41         eventLoop->events.mask = AE_NONE;
42   return eventLoop;
43 }
44
    //底层实现执行释放操作后,释放state的内存
45 void aeDeleteEventLoop(aeEventLoop *eventLoop) {
46   aeApiFree(eventLoop);
47   zfree(eventLoop);
48 }
49
    //停止事件循环,redis作为一个无限循环的server,在redis.c中并没有任何一处调用此函数
50 void aeStop(aeEventLoop *eventLoop) {
51   eventLoop->stop = 1;
52 }
53
    //创建文件fd事件,加入事件循环监控列表,使得后续epoll\select时将会测试这个文件描述符的可读性(可写性)
54 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
55         aeFileProc *proc, void *clientData)
56 {
57   if (fd >= AE_SETSIZE) return AE_ERR;
58   aeFileEvent *fe = &eventLoop->events;
59
60   if (aeApiAddEvent(eventLoop, fd, mask) == -1)
61         return AE_ERR;
62   fe->mask |= mask;
      //注册函数
63   if (mask & AE_READABLE) fe->rfileProc = proc;
64   if (mask & AE_WRITABLE) fe->wfileProc = proc;
65   fe->clientData = clientData;
      //更新maxfd
66   if (fd > eventLoop->maxfd)
67         eventLoop->maxfd = fd;
68   return AE_OK;
69 }
70
71 void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
72 {
73   if (fd >= AE_SETSIZE) return;
74   aeFileEvent *fe = &eventLoop->events;
75
76   if (fe->mask == AE_NONE) return;
77   fe->mask = fe->mask & (~mask);
78   if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
79         /* Update the max fd */
80         int j;
81
82         for (j = eventLoop->maxfd-1; j >= 0; j--)
83             if (eventLoop->events.mask != AE_NONE) break;
84         eventLoop->maxfd = j;
85   }
86   aeApiDelEvent(eventLoop, fd, mask);
87 }
88
    //返回值为该文件描述符关注的事件类型(可读、可写)
89 int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
90   if (fd >= AE_SETSIZE) return 0;
91   aeFileEvent *fe = &eventLoop->events;
92
93   return fe->mask;
94 }
95
96 static void aeGetTime(long *seconds, long *milliseconds)
97 {
98   struct timeval tv;
99
100   gettimeofday(&tv, NULL);
101   *seconds = tv.tv_sec;
102   *milliseconds = tv.tv_usec/1000;
103 }
104
105 static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
106   long cur_sec, cur_ms, when_sec, when_ms;
107
108   aeGetTime(&cur_sec, &cur_ms);
109   when_sec = cur_sec + milliseconds/1000;
110   when_ms = cur_ms + milliseconds%1000;
111   if (when_ms >= 1000) {
112         when_sec ++;
113         when_ms -= 1000;
114   }
115   *sec = when_sec;
116   *ms = when_ms;
117 }
118
119 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
120         aeTimeProc *proc, void *clientData,
121         aeEventFinalizerProc *finalizerProc)
122 {
123   long long id = eventLoop->timeEventNextId++;
124   aeTimeEvent *te;
125
126   te = zmalloc(sizeof(*te));
127   if (te == NULL) return AE_ERR;
128   te->id = id;
      //time event执行时间为绝对时间点
129   aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
130   te->timeProc = proc;
131   te->finalizerProc = finalizerProc;
132   te->clientData = clientData;
      //time event链表为无序表,直接插入到链表头
133   te->next = eventLoop->timeEventHead;
134   eventLoop->timeEventHead = te;
135   return id;
136 }
137
    //从链表中删除,是以id作为key查找的(顺序遍历)
138 int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
139 {
140   aeTimeEvent *te, *prev = NULL;
141
142   te = eventLoop->timeEventHead;
143   while(te) {
144         if (te->id == id) {
145             if (prev == NULL)
146               eventLoop->timeEventHead = te->next;
147             else
148               prev->next = te->next;
149             if (te->finalizerProc)
150               te->finalizerProc(eventLoop, te->clientData);
151             zfree(te);
152             return AE_OK;
153         }
154         prev = te;
155         te = te->next;
156   }
157   return AE_ERR; /* NO event with the specified ID found */
158 }
159
160 /* Search the first timer to fire.
161* This operation is useful to know how many time the select can be
162* put in sleep without to delay any event. (没大看懂),也许是给一个阻塞时间的上限值
163* If there are no timers NULL is returned.
164*
165* Note that's O(N) since time events are unsorted.
166* Possible optimizations (not needed by Redis so far, but...):
167* 1) Insert the event in order, so that the nearest is just the head.
168*    Much better but still insertion or deletion of timers is O(N).
169* 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
170*/
171 static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
172 {
173   aeTimeEvent *te = eventLoop->timeEventHead;
174   aeTimeEvent *nearest = NULL;
175
176   while(te) {
177         if (!nearest || te->when_sec < nearest->when_sec ||
178               (te->when_sec == nearest->when_sec &&
179                  te->when_ms < nearest->when_ms))
180             nearest = te;
181         te = te->next;
182   }
183   return nearest;
184 }
185
    //对time event进行处理
186 /* Process time events */
187 static int processTimeEvents(aeEventLoop *eventLoop) {
      //处理的事件数
188   int processed = 0;
189   aeTimeEvent *te;
190   long long maxId;
191
192   te = eventLoop->timeEventHead;
193   maxId = eventLoop->timeEventNextId-1;
194   while(te) {
195         long now_sec, now_ms;
196         long long id;
197
198         if (te->id > maxId) {
199             te = te->next;
200             continue;
201         }
202         aeGetTime(&now_sec, &now_ms);
203         if (now_sec > te->when_sec ||
204             (now_sec == te->when_sec && now_ms >= te->when_ms))
205         {
206             int retval;
207
208             id = te->id;
                //如果执行时间到或已超出,则执行对应的时间处理函数
209             retval = te->timeProc(eventLoop, id, te->clientData);
210             processed++;
211             /* After an event is processed our time event list may
212            * no longer be the same, so we restart from head. //因为时间处理函数timeProc可能改变此链表
213            * Still we make sure to don't process events registered
214            * by event handlers itself in order to don't loop forever.?
215            * To do so we saved the max ID we want to handle.
216            *
217            * FUTURE OPTIMIZATIONS:
218            * Note that this is NOT great algorithmically. Redis uses
219            * a single time event so it's not a problem but the right
220            * way to do this is to add the new elements on head, and
221            * to flag deleted elements in a special way for later
222            * deletion (putting references to the nodes to delete into
223            * another linked list). */
                //如果这个时间处理函数不再继续执行,则从time event的链表中删除事件;否则,retval为继续执行的时间间隔(单位为ms),在当前的timeevent struct的时间值上进行增加
224             if (retval != AE_NOMORE) {
225               aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
226             } else {
227               aeDeleteTimeEvent(eventLoop, id);
228             }
229             te = eventLoop->timeEventHead;
230         } else {
231             te = te->next;
232         }
233   }
234   return processed;
235 }
236
237 /* Process every pending(悬而未决) time event, then every pending file event
238* (that may be registered by time event callbacks just processed).
239* Without special flags the function sleeps until some file event
240* fires, or when the next time event occurrs (if any).
241*
242* If flags is 0, the function does nothing and returns.
243* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
244* if flags has AE_FILE_EVENTS set, file events are processed.
245* if flags has AE_TIME_EVENTS set, time events are processed.
246* if flags has AE_DONT_WAIT set the function returns ASAP until all
247* the events that's possible to process without to wait are processed.
248*
249* The function returns the number of events processed. */
250 int aeProcessEvents(aeEventLoop *eventLoop, int flags)
251 {
252   int processed = 0, numevents;
253
254   /* Nothing to do? return ASAP */
255   if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
256
257   /* Note that we want call select() even if there are no
258      * file events to process as long as we want to process time
259      * events, in order to sleep until the next time event is ready
260      * to fire. */
261   if (eventLoop->maxfd != -1 ||
262         ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
263         int j;
264         aeTimeEvent *shortest = NULL;
265         struct timeval tv, *tvp;
266
267         if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
268             shortest = aeSearchNearestTimer(eventLoop);
269         if (shortest) {
270             long now_sec, now_ms;
271
272             /* Calculate the time missing for the nearest
273            * timer to fire. */
274             aeGetTime(&now_sec, &now_ms);
275             tvp = &tv;
276             tvp->tv_sec = shortest->when_sec - now_sec;
277             if (shortest->when_ms < now_ms) {
278               tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
279               tvp->tv_sec --;
280             } else {
281               tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
282             }
283             if (tvp->tv_sec < 0) tvp->tv_sec = 0;
284             if (tvp->tv_usec < 0) tvp->tv_usec = 0;疑似有bug,比如当前时间为 1s +2ms,shortest时间为0s+1ms的case
285         } else {
286             /* If we have to check for events but need to return
287            * ASAP because of AE_DONT_WAIT we need to se the timeout
288            * to zero */
289             if (flags & AE_DONT_WAIT) {
290               tv.tv_sec = tv.tv_usec = 0;
291               tvp = &tv;
292             } else {
293               /* Otherwise we can block */
294               tvp = NULL; /* wait forever */
295             }
296         }
297
            //指定这个tvp是说这个tvp到期时,至少由time event可以执行
298         numevents = aeApiPoll(eventLoop, tvp);
299         for (j = 0; j < numevents; j++) {
300             aeFileEvent *fe = &eventLoop->events.fd];
301             int mask = eventLoop->fired.mask;
302             int fd = eventLoop->fired.fd;
303             int rfired = 0;
304
305         /* note the fe->mask & mask & ... code: maybe an already processed
306            * event removed an element that fired and we still didn't
307            * processed, so we check if the event is still valid. */
308             if (fe->mask & mask & AE_READABLE) {
309               rfired = 1;
310               fe->rfileProc(eventLoop,fd,fe->clientData,mask);
311             }
                //避免同一个注册函数被调用两次
312             if (fe->mask & mask & AE_WRITABLE) {
313               if (!rfired || fe->wfileProc != fe->rfileProc)
314                     fe->wfileProc(eventLoop,fd,fe->clientData,mask);
315             }
316             processed++;
317         }
318   }
319   /* Check time events */
320   if (flags & AE_TIME_EVENTS)
321         processed += processTimeEvents(eventLoop);
322
323   return processed; /* return the number of processed file/time events */
324 }
325
    //对select的一个简单封装,没有太大意义
326 /* Wait for millseconds until the given file descriptor becomes
327* writable/readable/exception */
328 int aeWait(int fd, int mask, long long milliseconds) {
329   struct timeval tv;
330   fd_set rfds, wfds, efds;
331   int retmask = 0, retval;
332
333   tv.tv_sec = milliseconds/1000;
334   tv.tv_usec = (milliseconds%1000)*1000;
335   FD_ZERO(&rfds);
336   FD_ZERO(&wfds);
337   FD_ZERO(&efds);
338
339   if (mask & AE_READABLE) FD_SET(fd,&rfds);
340   if (mask & AE_WRITABLE) FD_SET(fd,&wfds);
341   if ((retval = select(fd+1, &rfds, &wfds, &efds, &tv)) > 0) {
342         if (FD_ISSET(fd,&rfds)) retmask |= AE_READABLE;
343         if (FD_ISSET(fd,&wfds)) retmask |= AE_WRITABLE;
344         return retmask;
345   } else {
346         return retval;
347   }
348 }
349
350 void aeMain(aeEventLoop *eventLoop) {
351   eventLoop->stop = 0;
352   while (!eventLoop->stop) {
353         if (eventLoop->beforesleep != NULL)
354             eventLoop->beforesleep(eventLoop);
355         aeProcessEvents(eventLoop, AE_ALL_EVENTS);
356   }
357 }
358
359 char *aeGetApiName(void) {
360   return aeApiName();
361 }
362
363 void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
364   eventLoop->beforesleep = beforesleep;
365 }
页: [1]
查看完整版本: redis源码笔记-ae.c