事件机制
1. 使用ae的时候要初始化一个aeEventLoop,这个保存了一个进程要关注的所有事件相关信息
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd;
long long timeEventNextId;
aeFileEvent events[AE_SETSIZE]; /* Registered events */ AE_SETSIZE是10*1024 限制epoll能最多关注的事件最大个数
aeFiredEvent fired[AE_SETSIZE]; /* Fired events */
aeTimeEvent *timeEventHead; 这个是注册的所有定时事件
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep; 一个函数指针,指定每次aeMain每次轮询要做的事情
} aeEventLoop;
2. 这个aeEventLoop两个非常重要的数据结构,一个是aeFileEvent 还有一个是aeTimeEvent。
前者关注的个数用AE_SETSIZE做了限制,是一个固定大小的数据,这个里面放了一个文件事件的所有相关信息;
后者关注的定时事件,这个的个数不好提前确定,这里的实现方式是通过一个Linked List,所以不支持随机访问。
这几个相关的定义如下:
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
3. 然后使用aeCreateFileEvent和aeCreateTimeEvent,向aeEventLoop 注册事件,可以注册AE_SETSIZE个与FD相关的事件和无数个定时事件
FD方式要指定监听的FD,定时方式要指定轮询间隔
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc)
4. 最后是调用aeMain,aeMain会调用aeProcessEvents,这个aeProcessEvents就是处理事件的核心函数
5. 从几个重要的方面来说明aeProcessEvents处理事件的方式:
a. aeProcessEvents即会处理FD类型的事件,也会处理定时类型的时间。处理顺序是先处理FD类型的,然后处理定时类型的
b. 需要处理的FD类型事件是通过static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)来获得的,这里的tvp的用意后面会提到。
需要注意的是这个aeApiPoll根据OS的不同有多个实现,在不同的文件中,具体使用哪个取决于下面的代码:
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending.
* 出现在ae.c中,自动选择使用哪种事件机制 */
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
c. 由于,FD事件是提前于定时事件处理的,所以,这里有一种可能就是,在对FD类型事件做epoll_wait的时候,有一些定时事件的期限已经到了,但是由于
epoll_wait是阻塞住的,所以没有办法得到及时处理。ae库采取的解决方式是:设置epoll_wait的超时时间,但是这个时间设置成多少合适呢,函数aeSearchNearestTimer就是要解决这个问题的,它会便利计算出aeTimeEvent这个链表中所有定时事件的触发时间,然后返回下一个要触发事件的时间。
然后aeProcessEvents会计算当前时间举例这个时间的时间差,这个时间差就是epoll_wait的超时时间,通过aeApiPoll会传给epoll_wait。
如果这个时间差是零或者是负数,那么epoll_wait将会立即返回;如果没有超时事件,超时时间就会设置成-1,epoll_wait就会一直等
d. 再说一下processTimeEvents这个处理定时事件的函数。这个函数要一个一个的扫描链表里面的时间,发现一个到了时间就处理一个,否则,就把超时间增加1ms。
如果是成功处理完一个,则要重新扫描整个链表(因为在处理的同时,有可能前面的时间有到期了);如果一个没有处理,则跳到下一个看条件是否满足
这个地方是个低效的方法,只要处理成功下次就要进行复杂度为O(n)的查找
protocol
* CR LF
$ CR LF
CR LF
...
$ CR LF
CR LF
Bulk Reply
The format used for every argument $6\r\nmydata\r\n is called a Bulk Reply
*3
$3
SET
$5
mykey
$7
myvalue
"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"
Replies
Redis will reply to commands with different kinds of replies. It is possible to check the kind of reply from the first byte sent by the server:
With a single line reply the first byte of the reply will be "+"
With an error message the first byte of the reply will be "-"
With an integer number the first byte of the reply will be ":"
With bulk reply the first byte of the reply will be "$"
With multi-bulk reply the first byte of the reply will be "*"
Multi-bulk replies
C: LRANGE mylist 0 3
s: *4
s: $3
s: foo
s: $3
s: bar
s: $5
s: Hello
s: $5
s: World
sds.h
struct sdshdr {
int len;
int free;
char buf[];
};
adlist.h
typedef struct listNode {
struct listNode *prev;
struct listNode *next;
void *value;
} listNode;
typedef struct listIter {
listNode *next;
int direction;
} listIter;
typedef struct list {
listNode *head;
listNode *tail;
void *(*dup)(void *ptr);
void (*free)(void *ptr);
int (*match)(void *ptr, void *key);
unsigned int len;
} list;
dict.h
typedef struct dictEntry {
void *key;
void *val;
struct dictEntry *next;
} dictEntry;
typedef struct dictType {
unsigned int (*hashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
} dictType;
/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
dictEntry **table;
unsigned long size;
unsigned long sizemask;
unsigned long used;
} dictht;
typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];
int rehashidx; /* rehashing not in progress if rehashidx == -1 */
int iterators; /* number of iterators currently running */
} dict;
redis.h
typedef struct redisObject {
unsigned type:4;
unsigned storage:2; /* REDIS_VM_MEMORY or REDIS_VM_SWAPPING */
unsigned encoding:4;
unsigned lru:22; /* lru time (relative to server.lruclock) */
int refcount;
void *ptr;
/* VM fields are only allocated if VM is active, otherwise the
* object allocation function will just allocate
* sizeof(redisObjct) minus sizeof(redisObjectVM), so using
* Redis without VM active will not have any overhead. */
} robj;
关于存储的结构分析,公司五竹写的很好,网上可以查到http://www.searchtb.com/,在这个博客里面 |