设为首页 收藏本站
查看: 875|回复: 0

[经验分享] redis源码分析(3)——请求处理

[复制链接]

尚未签到

发表于 2015-11-12 11:16:27 | 显示全部楼层 |阅读模式
  前两篇介绍了redis的初始化过程,以及事件循环。本篇来看一下客户端的连接建立与请求处理。
(1)连接建立
在初始化一篇中提到过,redis在将监听socket初始化完毕之后,会将他们添加到事件循环中:
    for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
&quot;Unrecoverable error creating server.ipfd file event.&quot;);
}
}监听socket的读事件就是有客户端连接请求过来,对应的事件处理函数是acceptTcpHandler,这个函数就是用于处理客户端连接建立。函数如下:void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_WARNING,
&quot;Accepting client connection: %s&quot;, server.neterr);
return;
}
redisLog(REDIS_VERBOSE,&quot;Accepted %s:%d&quot;, cip, cport);
acceptCommonHandler(cfd,0);
}
}当这个函数被调用时(对应的监听socket的读事件发生),已经有客户端完成三次握手建立连接。函数anetTcpAccept用于accept客户端的连接,其返回值是客户端对应的socket。然后,会调用acceptCommonHandler对连接以及客户端进行初始化。这部分逻辑是在一个while循环中,最多迭代MAX_ACCEPTS_PER_CALL(1000)次,也就是说每次事件循环最多可以处理1000个客户端的连接。下面,看一下accetpCommonHandler函数static void acceptCommonHandler(int fd, int flags) {
redisClient *c;
if ((c = createClient(fd)) == NULL) {
redisLog(REDIS_WARNING,
&quot;Error registering fd event for the new client: %s (fd=%d)&quot;,
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
/* If maxclient directive is set and this is one client more... close the
* connection. Note that we create the client instead to check before
* for this condition, since now the socket is already set in non-blocking
* mode and we can send an error for free using the Kernel I/O */
if (listLength(server.clients) > server.maxclients) {
char *err = &quot;-ERR max number of clients reached\r\n&quot;;
/* That's a best effort error message, don't check write errors */
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
freeClient(c);
return;
}
server.stat_numconnections++;
c->flags |= flags;
}这个函数主要调用createClient初始化客户端相关数据结构以及对应的socket,初始化后会判断当前连接的客户端是否超过最大值,如果超过的话,会拒绝这次连接。否则,更新客户端连接数的计数。     数据结构redisClient用于表示一个客户端的连接,包括一个客户多次请求的状态,createClient函数主要是初始化这个数据结构。在createClient函数中,首先是创建redisClient,然后是设置socket的属性,然后添加该socket的读事件。    if (fd != -1) {
anetNonBlock(NULL,fd);
// <MM>
// 关闭Nagle算法,提升响应速度
// </MM>
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}将socket设置为非阻塞的并且no delay,关闭Nagle算法,提升响应速度。最后会注册socket的读事件,事件处理函数是readQueryFromClient,这个函数便是客户端请求的起点,之后会详细介绍。     createClient函数的最后部分,就是对redisClient的属性初始化,代码不再列出。     当从acceptTcpHandler返回后,客户端的连接就建立完毕,接下来就是等待客户端的请求。(2)请求处理
可以把redis请求处理的过程分成3个步骤:          (1)读取请求buffer          (2)解析请求          (3)处理请求     先简单概括一下这个流程。redis的请求分为两种类型:          (1)inline:简单字符串格式,比如:ping命令          (2)multi bulk:字符串数组格式,比如:set,get等等大部分命令     在请求处理时,会根据不同类型,分别处理。redisClient->reqtype存储请求类型。1)readQueryFromClient函数
前文介绍到,readQueryFromClient是请求处理的起点,这个函数就是用于读取请求buffer的,下面详解一下这个函数。首先,设置当前服务的client,然后是设置这次从socket读取的数据的默认大小(REDIS_IOBUF_LEN为16KB)。    server.current_client = c;
readlen = REDIS_IOBUF_LEN;这段代码重新设置读取数据的大小,避免频繁拷贝数据。如果当前请求是一个multi bulk类型的,并且要处理的bulk的大小大于REDIS_MBULK_BIG_ARG(32KB),则将读取数据大小设置为该bulk剩余数据的大小。
    /* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= REDIS_MBULK_BIG_ARG)
{
int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining < readlen) readlen = remaining;
}读取的请求内容会存储到redisClient->querybuf中,此处代码调整querybuf大小以便容纳这次read的数据。在调用sdsMakeRoomFor函数时,如果buffer的空闲空间小于readlen,则buffer大小翻倍,并将数据拷贝到新buffer。所以会存在一次数据拷贝。
    qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
// <MM>
// 可能存在一次copy
// querybuf的初始大小为16KB
// </MM>
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);接下来调用read系统调用,读取readlen大小的数据,并存储到querybuf中。同时,校验read的返回值,检测出错。如果read返回0,则客户端关闭连接,会释放掉该客户端。    // <MM>
// 非阻塞的读取socket
// </MM>
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
// <MM>
// fd是非阻塞的,EAGAIN表示read不到数据
// </MM>
if (errno == EAGAIN) {
nread = 0;
} else {
redisLog(REDIS_VERBOSE, &quot;Reading from client: %s&quot;,strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
// <MM>
// client关闭
// </MM>
redisLog(REDIS_VERBOSE, &quot;Client closed connection&quot;);
freeClient(c);
return;
}   
if (nread) {
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & REDIS_MASTER) c->reploff += nread;
} else {
server.current_client = NULL;
return;
}
判断客户端的请求buffer是否超过配置的值server.client_max_querybuf_len(1GB),如果超过,会拒绝服务,并关闭该客户端。
    // <MM>
// 保证query buffer不大于1GB
// </MM>
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
redisLog(REDIS_WARNING,&quot;Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)&quot;, ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}最后,会调用processInputBuffer函数解析请求。
    // 解析请求buf,创建args数组: cmd arg1 arg2 ...
// </MM>
processInputBuffer(c);
server.current_client = NULL;2)processInputBuffer函数
下面,看一下processInputBuffer函数如何解析请求。函数内部是一个while循环:
     while(sdslen(c->querybuf)) {
...
}只要querybuf中还包含完整的命令就会一直处理。下面看下循环内部:     暂时还没有看到阻塞命令部分,这块先跳过。        /* Immediately abort if the client is in the middle of something. */
// <MW>
// 什么情况下,走这个分支?
// </MW>
if (c->flags & REDIS_BLOCKED) return;如果设置了REDIS_CLOSE_AFTER_REPLY标记,则立即返回        /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands). */
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;如果reqtype为0,则当前请求类型还不知道,则解析一下请求类型。即根据请求的第一个字符是不是’*’,判断是不是multi bulk请求。
        /* Determine request type when unknown. */
if (!c->reqtype) {
// <MM>
// 设置请求类型,redis中很多命令是通过redis protocol中得
// array定义的,即multibulk,第一个字符是*
// </MM>
if (c->querybuf[0] == '*') {
c->reqtype = REDIS_REQ_MULTIBULK;
} else {
c->reqtype = REDIS_REQ_INLINE;
}
}根据请求类型,分别调用processInlineBuffer和processMultibulkBuffer解析请求,存储于client->argv数组,client->argc。只有buffer中包含一个完整请求时,这两个函数才会解析成功返回REDIS_OK,接下来会处理命令。否则,会跳出外部的while循环,等待下一次事件循环再从socket读取剩余的数据,再进行解析。
        // <MM>
// 将raw buffer解析成请求参数数组,存于client->argv,client->argc
// 从网络读取client请求,不能保证当前buffer中的数据包含一个完整的
// 命令,需要多次read
// </MM>
if (c->reqtype == REDIS_REQ_INLINE) {
if (processInlineBuffer(c) != REDIS_OK) break;
} else if (c->reqtype == REDIS_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != REDIS_OK) break;
} else {
redisPanic(&quot;Unknown request type&quot;);
}当上述两个函数返回REDIS_OK,即表示已经解析出一个命令,接下来调用processCommand函数进行命令处理。
        // <MM>
// 执行到这,意味着已经读取到一个完整的命令
// </MM>
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
if (processCommand(c) == REDIS_OK)
resetClient(c);
}3)processInlineBuffer函数
这个函数是解析inline请求。格式上,inline请求就是诸如”+PING\r\n”。在解析时,只需要查找\n即可。     首先,就是查找\n。如果没有找到,说明buffer中的内容不是完整的请求,然后判断一下querybuf的大小是否超过REDIS_INLINE_MAX_SIZE(64KB),超过则向客户端发送错误响应。否则,返回REDIS_ERR,继续read数据。    /* Search for end of line */
newline = strchr(c->querybuf,'\n');
/* Nothing to do without a \r\n */
// <MM>
// 没有\n,需要下一次epoll迭代,继续read
// 只在当前querybuf只包含一个请求,且该请求的长度大于16KB时才会出现
// </MM>
if (newline == NULL) {
if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
addReplyError(c,&quot;Protocol error: too big inline request&quot;);
setProtocolError(c,0);
}
return REDIS_ERR;
}
找到\n,则读到一个请求,接下来会将该请求数据拷贝到一个新的buffer中,等待解析。
    /* Handle the \r\n case. */
if (newline && newline != c->querybuf && *(newline-1) == '\r')
newline--;
/* Split the input buffer up to the \r\n */
querylen = newline-(c->querybuf);
// <MW>
// 存在一次内存拷贝,即所有请求都需要拷贝一次
// </MW>
aux = sdsnewlen(c->querybuf,querylen);接下来,根据空格将请求分割成多个部分,存储到argv数组,大小存于argc中。解析过程中,需要将一个请求拷贝到一个新的buffer中,然后解析。不知道为什么不能改成在querybuf中,原地解析,避免一次拷贝。此处可能是redia的一个优化点。
    // <MM>
// 将请求的raw buffer,根据空格分割成多个部分
// </MM>
argv = sdssplitargs(aux,&argc);
sdsfree(aux);
if (argv == NULL) {
addReplyError(c,&quot;Protocol error: unbalanced quotes in request&quot;);
setProtocolError(c,0);
return REDIS_ERR;
}更新一下主从同步时间。同时,因为已经将请求的内存存储到argv数组中,接下来,把querybuf中未处理的内容拷贝到querybuf的开始部分。也就是说,querybuf中总是存当前尚未处理的请求。
    /* Newline from slaves can be used to refresh the last ACK time.
* This is useful for a slave to ping back while loading a big
* RDB file. */
if (querylen == 0 && c->flags & REDIS_SLAVE)
c->repl_ack_time = server.unixtime;
// <MW>
// 处理完一行后,还需要将后续的行移动(拷贝)至querybuf开始的位置
// </MW>
/* Leave data after the first line of the query in the buffer */
sdsrange(c->querybuf,querylen+2,-1);根据解析的内容,创建redisClient->argv数组。
    /* Setup argv array on client structure */
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*)*argc);
/* Create redis objects for all arguments. */
for (c->argc = 0, j = 0; j < argc; j++) {
if (sdslen(argv[j])) {
// <MM>
// 默认把所有的argv当做是,string类型的robj
// </MM>
c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
c->argc++;
} else {
sdsfree(argv[j]);
}
}
zfree(argv);
return REDIS_OK;4)processMultibulkBuffer函数
这个函数解析multi bulk的请求。会以一个bulk为单位解析整个命令,在redisClient->multibulklen存储bulk的数量,然后循环一次解析每个bulk,如果multibulklen=0,则首先需要解析出multibulklen。redisClient->bulklen存储当前要解析的bulk的大小,如果bulk=-1,则首先要解析出bulk。     下面代码,如果multibulklen=0,说明是一个新的请求,首先需要解析出multibulklen。通过pos,指示当前querybuf中为解析的部分。如果后续在解析一个请求过程中,是不会走这个分支的。    if (c->multibulklen == 0) {
/* The client should have been reset */
redisAssertWithInfo(c,NULL,c->argc == 0);
/* Multi bulk length cannot be read without a \r\n */
newline = strchr(c->querybuf,'\r');
if (newline == NULL) {
if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
addReplyError(c,&quot;Protocol error: too big mbulk count string&quot;);
setProtocolError(c,0);
}
return REDIS_ERR;
}
/* Buffer should also contain \n */
if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
return REDIS_ERR;
/* We know for sure there is a whole line since newline != NULL,
* so go ahead and find out the multi bulk length. */
redisAssertWithInfo(c,NULL,c->querybuf[0] == '*');
// <MM>
// 解析出bulk大小,即数组的长度
// </MM>
ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
if (!ok || ll > 1024*1024) {
addReplyError(c,&quot;Protocol error: invalid multibulk length&quot;);
setProtocolError(c,pos);
return REDIS_ERR;
}
pos = (newline-c->querybuf)+2;
if (ll <= 0) {
sdsrange(c->querybuf,pos,-1);
return REDIS_OK;
}
c->multibulklen = ll;
/* Setup argv array on client structure */
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
}接下来,是一个while循环:
while(c->multibulklen) {
...
}    循环multibulklen次,解析出对应个数的bulk。下面看一下这个循环内部:首先,也是如果bulklen=-1,说明要解析的是一个新的bulk,需要解析bulklen。
        if (c->bulklen == -1) {
// <MM>
// 有性能缺陷,如果没有读到\r,则此次的querybuf的搜索
// 相当于浪费掉了,下一轮处理时,还需从头读
// !!这个应该还好,bulk len的长度可控
// </MM>
newline = strchr(c->querybuf+pos,'\r');
if (newline == NULL) {
if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
addReplyError(c,
&quot;Protocol error: too big bulk count string&quot;);
setProtocolError(c,0);
return REDIS_ERR;
}
break;
}
/* Buffer should also contain \n */
if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
break;
// <MM>
// client发送的命令都是redis protocol的字符串数组
// 要确定每个bulk元素是字符串
// </MM>
if (c->querybuf[pos] != '$') {
addReplyErrorFormat(c,
&quot;Protocol error: expected '$', got '%c'&quot;,
c->querybuf[pos]);
setProtocolError(c,pos);
return REDIS_ERR;
}
// <MM>
// 获取字符串的长度,即bulk的大小
// </MM>
ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
if (!ok || ll < 0 || ll > 512*1024*1024) {
addReplyError(c,&quot;Protocol error: invalid bulk length&quot;);
setProtocolError(c,pos);
return REDIS_ERR;
}
pos += newline-(c->querybuf+pos)+2;
if (ll >= REDIS_MBULK_BIG_ARG) {
// <MM>
// 当一个元素的大小超过一定值时,进行优化避免频繁拷贝
// 正常在read客户端数据时,以16kb大小块读取,同时调整query buf
// 在调整过程中buf不足时,需要拷贝
// </MM>
size_t qblen;
/* If we are going to read a large object from network
* try to make it likely that it will start at c->querybuf
* boundary so that we can optimize object creation
* avoiding a large copy of data. */
sdsrange(c->querybuf,pos,-1);
pos = 0;
qblen = sdslen(c->querybuf);
/* Hint the sds library about the amount of bytes this string is
* going to contain. */
if (qblen < (size_t)ll+2)
c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-qblen);
}
c->bulklen = ll;
}
上述代码有一处优化,就是当bulklen > REDIS_MBULK_BIG_ARG(64KB)时,为了避免频繁拷贝。这段代码需要和后面解析bulk时,一起考量。这里做的操作是,将querybuf只包含该bulk的内容(将未处理的buffer,拷贝的querybuf开始部分),并且将querybuf的大小扩充至bulklen + 2(包括\r\n)。后续在解析出bulk时,直接使用querybuf作为底层存储,避免拷贝大对象。
            if (ll >= REDIS_MBULK_BIG_ARG) {
// <MM>
// 当一个元素的大小超过一定值时,进行优化避免频繁拷贝
// 正常在read客户端数据时,以16kb大小块读取,同时调整query buf
// 在调整过程中buf不足时,需要拷贝
// </MM>
size_t qblen;
/* If we are going to read a large object from network
* try to make it likely that it will start at c->querybuf
* boundary so that we can optimize object creation
* avoiding a large copy of data. */
sdsrange(c->querybuf,pos,-1);
pos = 0;
qblen = sdslen(c->querybuf);
/* Hint the sds library about the amount of bytes this string is
* going to contain. */
if (qblen < (size_t)ll+2)
c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-qblen);
}接下来,是解析bulk,首先会判断querybuf是否包含足够的内容。不够则返回,否则解析该bulk。     在解析bulk,如果bulk不是大对象(不超过64KB),则会调用createStringObject创建argv,这个函数内部会调用strnewlen函数,拷贝传入的buffer。如果是大对象,并且querybuf中只包含该bulk的内容,则调用createObject函数,直接以querybuf为底层存储,创建argv,避免了大对象的拷贝。最后,在解析出bulk后,需要将bulklen设置为-1,并将multibulklen减1。        /* Read bulk argument */
if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) {
// <MM>
// 如果当前query buf中的数据不足,少于下一个bulk的长度
// 则,不进行操作
// </MM>
/* Not enough data (+2 == trailing \r\n) */
break;
} else {
/* Optimization: if the buffer contains JUST our bulk element
* instead of creating a new object by *copying* the sds we
* just use the current sds string. */
if (pos == 0 &&
c->bulklen >= REDIS_MBULK_BIG_ARG &&
(signed) sdslen(c->querybuf) == c->bulklen+2)
{
c->argv[c->argc++] = createObject(REDIS_STRING,c->querybuf);
sdsIncrLen(c->querybuf,-2); /* remove CRLF */
c->querybuf = sdsempty();
/* Assume that if we saw a fat argument we'll see another one
* likely... */
c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen+2);
pos = 0;
} else {
c->argv[c->argc++] =
createStringObject(c->querybuf+pos,c->bulklen);
pos += c->bulklen+2;
}
// <MM>
// 读取完一个bulk后,将bulklen重置
// </MM>
c->bulklen = -1;
c->multibulklen--;
}跳出循环,首先需要调整querybuf,将未处理的内容拷贝的querybuf的开始部分。如果multibulklen=0,说明已经解析出命令所有的bulk,即命令解析成功,则返回REDIS_OK。
    // <MM>
// 每调用一次,都会存在一次拷贝
// 只有最后一个bulk内容不全时,才会调整querybuf
// </MM>
if (pos) sdsrange(c->querybuf,pos,-1);
/* We're done when c->multibulk == 0 */
if (c->multibulklen == 0) return REDIS_OK;
/* Still not read to process the command */
return REDIS_ERR;5)processCommand函数
这个函数有171行,大部分是执行一些检查、校验。这里只梳理一遍主要逻辑。     首先,根据argv[0],查找command table,找到对应的命令。并检查命令的操作数个数是否正确。如果命令找不到,或者操作数个数不对,则返回给客户端错误响应。    /* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
// <MM>
// hash查找command
// </MM>
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
flagTransaction(c);
addReplyErrorFormat(c,&quot;unknown command '%s'&quot;,
(char*)c->argv[0]->ptr);
return REDIS_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
// <MM>
// 校验命令参数
// </MM>
flagTransaction(c);
addReplyErrorFormat(c,&quot;wrong number of arguments for '%s' command&quot;,
c->cmd->name);
return REDIS_OK;
}跳过接下来很大一部分校验代码,最后,会调用call函数,回调该命令的处理函数。
    /* Exec the command */
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// <MM>
// 如果处于multi模式,则将cmd排队
// </MM>
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
// <MM>
// 回调命令的处理函数
// </MM>
call(c,REDIS_CALL_FULL);
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}6)call函数
如果当前有monitor客户端,则会把命令传输给该client。
    /* Sent the command to clients in MONITOR mode, only if the commands are
* not generated from reading an AOF. */
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & REDIS_CMD_SKIP_MONITOR))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
} 下面是回调命令的处理函数,会统计耗时,dirty用于记录更新操作的次数,用于完成save配置。
    /* Call the command. */
c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
redisOpArrayInit(&server.also_propagate);
dirty = server.dirty;
start = ustime();
c->cmd->proc(c);
duration = ustime()-start;
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;后面几行代码,与lua有关,暂时跳过。接下来,是添加到slow log,并更新命令的统计计数。
    /* Log the command into the Slow log if needed, and populate the
* per-command statistics that we show in INFO commandstats. */
if (flags & REDIS_CALL_SLOWLOG && c->cmd->proc != execCommand) {
char *latency_event = (c->cmd->flags & REDIS_CMD_FAST) ?
&quot;fast-command&quot; : &quot;command&quot;;
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
}
if (flags & REDIS_CALL_STATS) {
c->cmd->microseconds += duration;
c->cmd->calls++;
}下面的代码,用于完成AOF和replication相关。之后,介绍这两个主题时,再涉及。
    /* Propagate the command into the AOF and replication link */
if (flags & REDIS_CALL_PROPAGATE) {
int flags = REDIS_PROPAGATE_NONE;
if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
if (dirty)
flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
if (flags != REDIS_PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
}接下来的代码,暂时不知道什么地方再用,先略过。最后更新命令执行的计数。
    /* Restore the old FORCE_AOF/REPL flags, since call can be executed
* recursively. */
c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL);
/* Handle the alsoPropagate() API to handle commands that want to propagate
* multiple separated commands. */
if (server.also_propagate.numops) {
int j;
redisOp *rop;
for (j = 0; j < server.also_propagate.numops; j++) {
rop = &server.also_propagate.ops[j];
propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);
}
redisOpArrayFree(&server.also_propagate);
}
server.stat_numcommands++;当从call返回后,就完成命令的处理,此时响应内容已经放到client的影响buffer中。具体的响应的过程会放到下一篇继续讲解。
         
版权声明:本文为博主原创文章,未经博主允许不得转载。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-138267-1-1.html 上篇帖子: 理想的redis集群 下篇帖子: 使用jedis访问redis的sentinel
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表