苏泽湛 发表于 2015-11-12 14:26:04

Redis源码解析 - 客户端工作流程及命令编码

  Redis客户端工作redis-cli代码在redis-cli.c中,入口函数为main:
  

/* Start interactive mode when no command is provided */
if (argc == 0 && !config.eval) {
/* Note that in repl mode we don't abort on connection error.
* A new attempt will be performed for every command send. */
cliConnect(0);
repl();
}

在使用cliConnect函数进行网络连接之后,进行rep函数等待命令行的输入:  
  

while((line = linenoise(context ? config.prompt : "not connected> ")) != NULL) {
if (line != '\0') {
argv = sdssplitargs(line,&argc);
printf("[%s] i have get a cmd with %d arguments, now ready go:\n", __FUNCTION__, argc);
for (int i=0; i<argc; i++) {
printf(&quot; %s \n&quot;, argv);
}
if (history) linenoiseHistoryAdd(line);
if (historyfile) linenoiseHistorySave(historyfile);
if (argv == NULL) {
printf(&quot;Invalid argument(s)\n&quot;);
free(line);
continue;
} else if (argc > 0) {
if (strcasecmp(argv,&quot;quit&quot;) == 0 ||
strcasecmp(argv,&quot;exit&quot;) == 0)
{
exit(0);
} else if (argc == 3 && !strcasecmp(argv,&quot;connect&quot;)) {
sdsfree(config.hostip);
config.hostip = sdsnew(argv);
config.hostport = atoi(argv);
cliConnect(1);
} else if (argc == 1 && !strcasecmp(argv,&quot;clear&quot;)) {
linenoiseClearScreen();
} else {
long long start_time = mstime(), elapsed;
int repeat, skipargs = 0;
repeat = atoi(argv);
printf(&quot;[%s] repeat=%d\n&quot;, __FUNCTION__, repeat);
if (argc > 1 && repeat) {
skipargs = 1;
} else {
repeat = 1;
}
while (1) {
config.cluster_reissue_command = 0;
if (cliSendCommand(argc-skipargs,argv+skipargs,repeat)
!= REDIS_OK)
{
cliConnect(1);
/* If we still cannot send the command print error.
* We'll try to reconnect the next time. */
if (cliSendCommand(argc-skipargs,argv+skipargs,repeat)
!= REDIS_OK)
cliPrintContextError();
}
/* Issue the command again if we got redirected in cluster mode */
if (config.cluster_mode && config.cluster_reissue_command) {
cliConnect(1);
} else {
break;
}
}
elapsed = mstime()-start_time;
if (elapsed >= 500) {
printf(&quot;(%.2fs)\n&quot;,(double)elapsed/1000);
}
}
}
/* Free the argument vector */
while(argc--) sdsfree(argv);
zfree(argv);
}

函数linenoise打印出客户端提示信息:  
  

redis 127.0.0.1:6379>

再通过sdssplitargs函数进行命令行的解析,argc存放参数个数,argv存入参数列表。例如:get a,解析完后argc=2,argv指向字符串数组{&quot;get&quot;,&quot;a&quot;}。  
  再进入cliSendCommand函数进行命令的发送。
  

tatic int cliSendCommand(int argc, char **argv, int repeat) {
char *command = argv;
size_t *argvlen;
int j, output_raw;
if (!strcasecmp(command,&quot;help&quot;) || !strcasecmp(command,&quot;?&quot;)) {
cliOutputHelp(--argc, ++argv);
return REDIS_OK;
}
if (context == NULL) return REDIS_ERR;
output_raw = 0;
if (!strcasecmp(command,&quot;info&quot;) ||
(argc == 2 && !strcasecmp(command,&quot;cluster&quot;) &&
(!strcasecmp(argv,&quot;nodes&quot;) ||
!strcasecmp(argv,&quot;info&quot;))) ||
(argc == 2 && !strcasecmp(command,&quot;client&quot;) &&
!strcasecmp(argv,&quot;list&quot;)))
{
output_raw = 1;
}
if (!strcasecmp(command,&quot;shutdown&quot;)) config.shutdown = 1;
if (!strcasecmp(command,&quot;monitor&quot;)) config.monitor_mode = 1;
if (!strcasecmp(command,&quot;subscribe&quot;) ||
!strcasecmp(command,&quot;psubscribe&quot;)) config.pubsub_mode = 1;
/* Setup argument length */
argvlen = malloc(argc*sizeof(size_t));
for (j = 0; j < argc; j++)
argvlen = sdslen(argv);
while(repeat--) {
redisAppendCommandArgv(context,argc,(const char**)argv,argvlen);
while (config.monitor_mode) {
if (cliReadReply(output_raw) != REDIS_OK) exit(1);
fflush(stdout);
}
if (config.pubsub_mode) {
if (config.output != OUTPUT_RAW)
printf(&quot;Reading messages... (press Ctrl-C to quit)\n&quot;);
while (1) {
if (cliReadReply(output_raw) != REDIS_OK) exit(1);
}
}
if (cliReadReply(output_raw) != REDIS_OK) {
free(argvlen);
return REDIS_ERR;
} else {
/* Store database number when SELECT was successfully executed. */
if (!strcasecmp(command,&quot;select&quot;) && argc == 2) {
config.dbnum = atoi(argv);
cliRefreshPrompt();
}
}
if (config.interval) usleep(config.interval);
fflush(stdout); /* Make it grep friendly */
}
free(argvlen);
return REDIS_OK;
}
  
  利用redisAppendCommandArgv函数将命令行组装成Redis的编码格式,例:set guixl niubility将会被组装成:
  

*3\r\n$3\r\nset\r\n$5\r\nguixl\r\n$9\r\nniubility
  然后进入cliReadReply->redisGetReply函数发送数据包,并等待回复:
  

int redisGetReply(redisContext *c, void **reply) {
int wdone = 0;
void *aux = NULL;
/* Try to read pending replies */
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR;
/* For the blocking context, flush output buffer and read reply */
if (aux == NULL && c->flags & REDIS_BLOCK) {
/* Write until done */
do {
if (redisBufferWrite(c,&wdone) == REDIS_ERR)
return REDIS_ERR;
} while (!wdone);
/* Read until there is a reply */
do {
if (redisBufferRead(c) == REDIS_ERR)
return REDIS_ERR;
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR;
} while (aux == NULL);
}
/* Set reply object */
if (reply != NULL) *reply = aux;
return REDIS_OK;
}
  数据包发送函数:
  /* Write the output buffer to the socket.
*
* Returns REDIS_OK when the buffer is empty, or (a part of) the buffer was
* succesfully written to the socket. When the buffer is empty after the
* write operation, &quot;done&quot; is set to 1 (if given).
*
* Returns REDIS_ERR if an error occured trying to write and sets
* c->errstr to hold the appropriate error string.
*/
int redisBufferWrite(redisContext *c, int *done) {
int nwritten;
/* Return early when the context has seen an error. */
if (c->err)
return REDIS_ERR;
if (sdslen(c->obuf) > 0) {
nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
if (nwritten == -1) {
if (errno == EAGAIN && !(c->flags & REDIS_BLOCK)) {
/* Try again later */
} else {
__redisSetError(c,REDIS_ERR_IO,NULL);
return REDIS_ERR;
}
} else if (nwritten > 0) {
if (nwritten == (signed)sdslen(c->obuf)) {
sdsfree(c->obuf);
c->obuf = sdsempty();
} else {
c->obuf = sdsrange(c->obuf,nwritten,-1);
}
}
}
if (done != NULL) *done = (sdslen(c->obuf) == 0);
return REDIS_OK;
}

Socket数据包读取函数:
  /* Use this function to handle a read event on the descriptor. It will try
* and read some bytes from the socket and feed them to the reply parser.
*
* After this function is called, you may use redisContextReadReply to
* see if there is a reply available. */
int redisBufferRead(redisContext *c) {
char buf;
int nread;
/* Return early when the context has seen an error. */
if (c->err)
return REDIS_ERR;
nread = read(c->fd,buf,sizeof(buf));
if (nread == -1) {
if (errno == EAGAIN && !(c->flags & REDIS_BLOCK)) {
/* Try again later */
} else {
__redisSetError(c,REDIS_ERR_IO,NULL);
return REDIS_ERR;
}
} else if (nread == 0) {
__redisSetError(c,REDIS_ERR_EOF,&quot;Server closed the connection&quot;);
return REDIS_ERR;
} else {
if (redisReaderFeed(c->reader,buf,nread) != REDIS_OK) {
__redisSetError(c,c->reader->err,c->reader->errstr);
return REDIS_ERR;
}
}
return REDIS_OK;
}

读取buffer内容并存放至c->reader,看此外代码,貌似返回的数据包大小不会超过16K.
  
  

版权声明:本文为博主原创文章,未经博主允许不得转载。
页: [1]
查看完整版本: Redis源码解析 - 客户端工作流程及命令编码