|
ae.c是redis事件框架的具体实现,这篇blog对这份源码进行简单说明。其中谈到了作者已经标记的一些未来可能做的改进。
ae.c
1 #include
2 #include
3 #include
4 #include
5 #include
6
7 #include "ae.h"
8 #include "zmalloc.h"
9 #include "config.h"
10
11 /* Include the best multiplexing layer supported by this system.
12 * The following should be ordered by performances, descending. */
//为了支持不同的平台,redis用相同的接口封装了系统提供的多路复用层代码。接口共提供如下函数:aeApiCreate\aeApiFree\aeApiAddEvent\aeApiDelEvent\aeApiPoll\aeApiName函数,以及一个struct aeApiState
//通过包含不同的头文件,选择不同的底层实现
//按如下顺序,效率递减: epoll > kqueue > select
13 #ifdef HAVE_EPOLL
14 #include "ae_epoll.c"
15 #else
16 #ifdef HAVE_KQUEUE
17 #include "ae_kqueue.c"
18 #else
19 #include "ae_select.c"
20 #endif
21 #endif
22
//初始化函数,创建事件循环,函数内部alloc一个结构,用于表示事件状态,供后续其他函数作为参数使用
23 aeEventLoop *aeCreateEventLoop(void) {
24 aeEventLoop *eventLoop;
25 int i;
26
27 eventLoop = zmalloc(sizeof(*eventLoop));
28 if (!eventLoop) return NULL;
//时间event用链表存储
29 eventLoop->timeEventHead = NULL;
30 eventLoop->timeEventNextId = 0;
//表示是否停止事件循环
31 eventLoop->stop = 0;
//maxfd只由ae_select.c使用,后续有些相关的处理,如果使用epoll的话,其实可以进行简化
32 eventLoop->maxfd = -1;
//每次调用epoll\select前调用的函数,由框架使用者注册
33 eventLoop->beforesleep = NULL;
34 if (aeApiCreate(eventLoop) == -1) {
35 zfree(eventLoop);
36 return NULL;
37 }
38 /* Events with mask == AE_NONE are not set. So let's initialize the
39 * vector with it. */
//将所有的文件描述符的mask设置为无效值,作为初始化
40 for (i = 0; i < AE_SETSIZE; i++)
41 eventLoop->events.mask = AE_NONE;
42 return eventLoop;
43 }
44
//底层实现执行释放操作后,释放state的内存
45 void aeDeleteEventLoop(aeEventLoop *eventLoop) {
46 aeApiFree(eventLoop);
47 zfree(eventLoop);
48 }
49
//停止事件循环,redis作为一个无限循环的server,在redis.c中并没有任何一处调用此函数
50 void aeStop(aeEventLoop *eventLoop) {
51 eventLoop->stop = 1;
52 }
53
//创建文件fd事件,加入事件循环监控列表,使得后续epoll\select时将会测试这个文件描述符的可读性(可写性)
54 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
55 aeFileProc *proc, void *clientData)
56 {
57 if (fd >= AE_SETSIZE) return AE_ERR;
58 aeFileEvent *fe = &eventLoop->events[fd];
59
60 if (aeApiAddEvent(eventLoop, fd, mask) == -1)
61 return AE_ERR;
62 fe->mask |= mask;
//注册函数
63 if (mask & AE_READABLE) fe->rfileProc = proc;
64 if (mask & AE_WRITABLE) fe->wfileProc = proc;
65 fe->clientData = clientData;
//更新maxfd
66 if (fd > eventLoop->maxfd)
67 eventLoop->maxfd = fd;
68 return AE_OK;
69 }
70
71 void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
72 {
73 if (fd >= AE_SETSIZE) return;
74 aeFileEvent *fe = &eventLoop->events[fd];
75
76 if (fe->mask == AE_NONE) return;
77 fe->mask = fe->mask & (~mask);
78 if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
79 /* Update the max fd */
80 int j;
81
82 for (j = eventLoop->maxfd-1; j >= 0; j--)
83 if (eventLoop->events[j].mask != AE_NONE) break;
84 eventLoop->maxfd = j;
85 }
86 aeApiDelEvent(eventLoop, fd, mask);
87 }
88
//返回值为该文件描述符关注的事件类型(可读、可写)
89 int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
90 if (fd >= AE_SETSIZE) return 0;
91 aeFileEvent *fe = &eventLoop->events[fd];
92
93 return fe->mask;
94 }
95
96 static void aeGetTime(long *seconds, long *milliseconds)
97 {
98 struct timeval tv;
99
100 gettimeofday(&tv, NULL);
101 *seconds = tv.tv_sec;
102 *milliseconds = tv.tv_usec/1000;
103 }
104
105 static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
106 long cur_sec, cur_ms, when_sec, when_ms;
107
108 aeGetTime(&cur_sec, &cur_ms);
109 when_sec = cur_sec + milliseconds/1000;
110 when_ms = cur_ms + milliseconds%1000;
111 if (when_ms >= 1000) {
112 when_sec ++;
113 when_ms -= 1000;
114 }
115 *sec = when_sec;
116 *ms = when_ms;
117 }
118
119 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
120 aeTimeProc *proc, void *clientData,
121 aeEventFinalizerProc *finalizerProc)
122 {
123 long long id = eventLoop->timeEventNextId++;
124 aeTimeEvent *te;
125
126 te = zmalloc(sizeof(*te));
127 if (te == NULL) return AE_ERR;
128 te->id = id;
//time event执行时间为绝对时间点
129 aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
130 te->timeProc = proc;
131 te->finalizerProc = finalizerProc;
132 te->clientData = clientData;
//time event链表为无序表,直接插入到链表头
133 te->next = eventLoop->timeEventHead;
134 eventLoop->timeEventHead = te;
135 return id;
136 }
137
//从链表中删除,是以id作为key查找的(顺序遍历)
138 int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
139 {
140 aeTimeEvent *te, *prev = NULL;
141
142 te = eventLoop->timeEventHead;
143 while(te) {
144 if (te->id == id) {
145 if (prev == NULL)
146 eventLoop->timeEventHead = te->next;
147 else
148 prev->next = te->next;
149 if (te->finalizerProc)
150 te->finalizerProc(eventLoop, te->clientData);
151 zfree(te);
152 return AE_OK;
153 }
154 prev = te;
155 te = te->next;
156 }
157 return AE_ERR; /* NO event with the specified ID found */
158 }
159
160 /* Search the first timer to fire.
161 * This operation is useful to know how many time the select can be
162 * put in sleep without to delay any event. (没大看懂),也许是给一个阻塞时间的上限值
163 * If there are no timers NULL is returned.
164 *
165 * Note that's O(N) since time events are unsorted.
166 * Possible optimizations (not needed by Redis so far, but...):
167 * 1) Insert the event in order, so that the nearest is just the head.
168 * Much better but still insertion or deletion of timers is O(N).
169 * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
170 */
171 static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
172 {
173 aeTimeEvent *te = eventLoop->timeEventHead;
174 aeTimeEvent *nearest = NULL;
175
176 while(te) {
177 if (!nearest || te->when_sec < nearest->when_sec ||
178 (te->when_sec == nearest->when_sec &&
179 te->when_ms < nearest->when_ms))
180 nearest = te;
181 te = te->next;
182 }
183 return nearest;
184 }
185
//对time event进行处理
186 /* Process time events */
187 static int processTimeEvents(aeEventLoop *eventLoop) {
//处理的事件数
188 int processed = 0;
189 aeTimeEvent *te;
190 long long maxId;
191
192 te = eventLoop->timeEventHead;
193 maxId = eventLoop->timeEventNextId-1;
194 while(te) {
195 long now_sec, now_ms;
196 long long id;
197
198 if (te->id > maxId) {
199 te = te->next;
200 continue;
201 }
202 aeGetTime(&now_sec, &now_ms);
203 if (now_sec > te->when_sec ||
204 (now_sec == te->when_sec && now_ms >= te->when_ms))
205 {
206 int retval;
207
208 id = te->id;
//如果执行时间到或已超出,则执行对应的时间处理函数
209 retval = te->timeProc(eventLoop, id, te->clientData);
210 processed++;
211 /* After an event is processed our time event list may
212 * no longer be the same, so we restart from head. //因为时间处理函数timeProc可能改变此链表
213 * Still we make sure to don't process events registered
214 * by event handlers itself in order to don't loop forever.?
215 * To do so we saved the max ID we want to handle.
216 *
217 * FUTURE OPTIMIZATIONS:
218 * Note that this is NOT great algorithmically. Redis uses
219 * a single time event so it's not a problem but the right
220 * way to do this is to add the new elements on head, and
221 * to flag deleted elements in a special way for later
222 * deletion (putting references to the nodes to delete into
223 * another linked list). */
//如果这个时间处理函数不再继续执行,则从time event的链表中删除事件;否则,retval为继续执行的时间间隔(单位为ms),在当前的timeevent struct的时间值上进行增加
224 if (retval != AE_NOMORE) {
225 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
226 } else {
227 aeDeleteTimeEvent(eventLoop, id);
228 }
229 te = eventLoop->timeEventHead;
230 } else {
231 te = te->next;
232 }
233 }
234 return processed;
235 }
236
237 /* Process every pending(悬而未决) time event, then every pending file event
238 * (that may be registered by time event callbacks just processed).
239 * Without special flags the function sleeps until some file event
240 * fires, or when the next time event occurrs (if any).
241 *
242 * If flags is 0, the function does nothing and returns.
243 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
244 * if flags has AE_FILE_EVENTS set, file events are processed.
245 * if flags has AE_TIME_EVENTS set, time events are processed.
246 * if flags has AE_DONT_WAIT set the function returns ASAP until all
247 * the events that's possible to process without to wait are processed.
248 *
249 * The function returns the number of events processed. */
250 int aeProcessEvents(aeEventLoop *eventLoop, int flags)
251 {
252 int processed = 0, numevents;
253
254 /* Nothing to do? return ASAP */
255 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
256
257 /* Note that we want call select() even if there are no
258 * file events to process as long as we want to process time
259 * events, in order to sleep until the next time event is ready
260 * to fire. */
261 if (eventLoop->maxfd != -1 ||
262 ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
263 int j;
264 aeTimeEvent *shortest = NULL;
265 struct timeval tv, *tvp;
266
267 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
268 shortest = aeSearchNearestTimer(eventLoop);
269 if (shortest) {
270 long now_sec, now_ms;
271
272 /* Calculate the time missing for the nearest
273 * timer to fire. */
274 aeGetTime(&now_sec, &now_ms);
275 tvp = &tv;
276 tvp->tv_sec = shortest->when_sec - now_sec;
277 if (shortest->when_ms < now_ms) {
278 tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
279 tvp->tv_sec --;
280 } else {
281 tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
282 }
283 if (tvp->tv_sec < 0) tvp->tv_sec = 0;
284 if (tvp->tv_usec < 0) tvp->tv_usec = 0; 疑似有bug,比如当前时间为 1s +2ms ,shortest时间为0s+1ms的case
285 } else {
286 /* If we have to check for events but need to return
287 * ASAP because of AE_DONT_WAIT we need to se the timeout
288 * to zero */
289 if (flags & AE_DONT_WAIT) {
290 tv.tv_sec = tv.tv_usec = 0;
291 tvp = &tv;
292 } else {
293 /* Otherwise we can block */
294 tvp = NULL; /* wait forever */
295 }
296 }
297
//指定这个tvp是说这个tvp到期时,至少由time event可以执行
298 numevents = aeApiPoll(eventLoop, tvp);
299 for (j = 0; j < numevents; j++) {
300 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
301 int mask = eventLoop->fired[j].mask;
302 int fd = eventLoop->fired[j].fd;
303 int rfired = 0;
304
305 /* note the fe->mask & mask & ... code: maybe an already processed
306 * event removed an element that fired and we still didn't
307 * processed, so we check if the event is still valid. */
308 if (fe->mask & mask & AE_READABLE) {
309 rfired = 1;
310 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
311 }
//避免同一个注册函数被调用两次
312 if (fe->mask & mask & AE_WRITABLE) {
313 if (!rfired || fe->wfileProc != fe->rfileProc)
314 fe->wfileProc(eventLoop,fd,fe->clientData,mask);
315 }
316 processed++;
317 }
318 }
319 /* Check time events */
320 if (flags & AE_TIME_EVENTS)
321 processed += processTimeEvents(eventLoop);
322
323 return processed; /* return the number of processed file/time events */
324 }
325
//对select的一个简单封装,没有太大意义
326 /* Wait for millseconds until the given file descriptor becomes
327 * writable/readable/exception */
328 int aeWait(int fd, int mask, long long milliseconds) {
329 struct timeval tv;
330 fd_set rfds, wfds, efds;
331 int retmask = 0, retval;
332
333 tv.tv_sec = milliseconds/1000;
334 tv.tv_usec = (milliseconds%1000)*1000;
335 FD_ZERO(&rfds);
336 FD_ZERO(&wfds);
337 FD_ZERO(&efds);
338
339 if (mask & AE_READABLE) FD_SET(fd,&rfds);
340 if (mask & AE_WRITABLE) FD_SET(fd,&wfds);
341 if ((retval = select(fd+1, &rfds, &wfds, &efds, &tv)) > 0) {
342 if (FD_ISSET(fd,&rfds)) retmask |= AE_READABLE;
343 if (FD_ISSET(fd,&wfds)) retmask |= AE_WRITABLE;
344 return retmask;
345 } else {
346 return retval;
347 }
348 }
349
350 void aeMain(aeEventLoop *eventLoop) {
351 eventLoop->stop = 0;
352 while (!eventLoop->stop) {
353 if (eventLoop->beforesleep != NULL)
354 eventLoop->beforesleep(eventLoop);
355 aeProcessEvents(eventLoop, AE_ALL_EVENTS);
356 }
357 }
358
359 char *aeGetApiName(void) {
360 return aeApiName();
361 }
362
363 void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
364 eventLoop->beforesleep = beforesleep;
365 } |
|