设为首页 收藏本站
查看: 877|回复: 0

[经验分享] Redis源码解析 - 客户端工作流程及命令编码

[复制链接]

尚未签到

发表于 2015-11-12 14:26:04 | 显示全部楼层 |阅读模式
  Redis客户端工作redis-cli代码在redis-cli.c中,入口函数为main:
  

/* Start interactive mode when no command is provided */
if (argc == 0 && !config.eval) {
/* Note that in repl mode we don't abort on connection error.
* A new attempt will be performed for every command send. */
cliConnect(0);
repl();
}

在使用cliConnect函数进行网络连接之后,进行rep函数等待命令行的输入:  
  

while((line = linenoise(context ? config.prompt : "not connected> ")) != NULL) {
if (line[0] != '\0') {
argv = sdssplitargs(line,&argc);
printf("[%s] i have get a cmd with %d arguments, now ready go:\n", __FUNCTION__, argc);
for (int i=0; i<argc; i++) {
printf(&quot;[CMD] %s \n&quot;, argv);
}
if (history) linenoiseHistoryAdd(line);
if (historyfile) linenoiseHistorySave(historyfile);
if (argv == NULL) {
printf(&quot;Invalid argument(s)\n&quot;);
free(line);
continue;
} else if (argc > 0) {
if (strcasecmp(argv[0],&quot;quit&quot;) == 0 ||
strcasecmp(argv[0],&quot;exit&quot;) == 0)
{
exit(0);
} else if (argc == 3 && !strcasecmp(argv[0],&quot;connect&quot;)) {
sdsfree(config.hostip);
config.hostip = sdsnew(argv[1]);
config.hostport = atoi(argv[2]);
cliConnect(1);
} else if (argc == 1 && !strcasecmp(argv[0],&quot;clear&quot;)) {
linenoiseClearScreen();
} else {
long long start_time = mstime(), elapsed;
int repeat, skipargs = 0;
repeat = atoi(argv[0]);
printf(&quot;[%s] repeat=%d\n&quot;, __FUNCTION__, repeat);
if (argc > 1 && repeat) {
skipargs = 1;
} else {
repeat = 1;
}
while (1) {
config.cluster_reissue_command = 0;
if (cliSendCommand(argc-skipargs,argv+skipargs,repeat)
!= REDIS_OK)
{
cliConnect(1);
/* If we still cannot send the command print error.
* We'll try to reconnect the next time. */
if (cliSendCommand(argc-skipargs,argv+skipargs,repeat)
!= REDIS_OK)
cliPrintContextError();
}
/* Issue the command again if we got redirected in cluster mode */
if (config.cluster_mode && config.cluster_reissue_command) {
cliConnect(1);
} else {
break;
}
}
elapsed = mstime()-start_time;
if (elapsed >= 500) {
printf(&quot;(%.2fs)\n&quot;,(double)elapsed/1000);
}
}
}
/* Free the argument vector */
while(argc--) sdsfree(argv[argc]);
zfree(argv);
}

函数linenoise打印出客户端提示信息:  
  

redis 127.0.0.1:6379>

再通过sdssplitargs函数进行命令行的解析,argc存放参数个数,argv存入参数列表。例如:get a,解析完后argc=2,argv指向字符串数组{&quot;get&quot;,&quot;a&quot;}。  
  再进入cliSendCommand函数进行命令的发送。
  

tatic int cliSendCommand(int argc, char **argv, int repeat) {
char *command = argv[0];
size_t *argvlen;
int j, output_raw;
if (!strcasecmp(command,&quot;help&quot;) || !strcasecmp(command,&quot;?&quot;)) {
cliOutputHelp(--argc, ++argv);
return REDIS_OK;
}
if (context == NULL) return REDIS_ERR;
output_raw = 0;
if (!strcasecmp(command,&quot;info&quot;) ||
(argc == 2 && !strcasecmp(command,&quot;cluster&quot;) &&
(!strcasecmp(argv[1],&quot;nodes&quot;) ||
!strcasecmp(argv[1],&quot;info&quot;))) ||
(argc == 2 && !strcasecmp(command,&quot;client&quot;) &&
!strcasecmp(argv[1],&quot;list&quot;)))
{
output_raw = 1;
}
if (!strcasecmp(command,&quot;shutdown&quot;)) config.shutdown = 1;
if (!strcasecmp(command,&quot;monitor&quot;)) config.monitor_mode = 1;
if (!strcasecmp(command,&quot;subscribe&quot;) ||
!strcasecmp(command,&quot;psubscribe&quot;)) config.pubsub_mode = 1;
/* Setup argument length */
argvlen = malloc(argc*sizeof(size_t));
for (j = 0; j < argc; j++)
argvlen[j] = sdslen(argv[j]);
while(repeat--) {
redisAppendCommandArgv(context,argc,(const char**)argv,argvlen);
while (config.monitor_mode) {
if (cliReadReply(output_raw) != REDIS_OK) exit(1);
fflush(stdout);
}
if (config.pubsub_mode) {
if (config.output != OUTPUT_RAW)
printf(&quot;Reading messages... (press Ctrl-C to quit)\n&quot;);
while (1) {
if (cliReadReply(output_raw) != REDIS_OK) exit(1);
}
}
if (cliReadReply(output_raw) != REDIS_OK) {
free(argvlen);
return REDIS_ERR;
} else {
/* Store database number when SELECT was successfully executed. */
if (!strcasecmp(command,&quot;select&quot;) && argc == 2) {
config.dbnum = atoi(argv[1]);
cliRefreshPrompt();
}
}
if (config.interval) usleep(config.interval);
fflush(stdout); /* Make it grep friendly */
}
free(argvlen);
return REDIS_OK;
}
  
  利用redisAppendCommandArgv函数将命令行组装成Redis的编码&#26684;式,例:set guixl niubility将会被组装成:
  

*3\r\n$3\r\nset\r\n$5\r\nguixl\r\n$9\r\nniubility
  然后进入cliReadReply->redisGetReply函数发送数据包,并等待回复:
  

int redisGetReply(redisContext *c, void **reply) {
int wdone = 0;
void *aux = NULL;
/* Try to read pending replies */
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR;
/* For the blocking context, flush output buffer and read reply */
if (aux == NULL && c->flags & REDIS_BLOCK) {
/* Write until done */
do {
if (redisBufferWrite(c,&wdone) == REDIS_ERR)
return REDIS_ERR;
} while (!wdone);
/* Read until there is a reply */
do {
if (redisBufferRead(c) == REDIS_ERR)
return REDIS_ERR;
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR;
} while (aux == NULL);
}
/* Set reply object */
if (reply != NULL) *reply = aux;
return REDIS_OK;
}
  数据包发送函数:
  /* Write the output buffer to the socket.
*
* Returns REDIS_OK when the buffer is empty, or (a part of) the buffer was
* succesfully written to the socket. When the buffer is empty after the
* write operation, &quot;done&quot; is set to 1 (if given).
*
* Returns REDIS_ERR if an error occured trying to write and sets
* c->errstr to hold the appropriate error string.
*/
int redisBufferWrite(redisContext *c, int *done) {
int nwritten;
/* Return early when the context has seen an error. */
if (c->err)
return REDIS_ERR;
if (sdslen(c->obuf) > 0) {
nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
if (nwritten == -1) {
if (errno == EAGAIN && !(c->flags & REDIS_BLOCK)) {
/* Try again later */
} else {
__redisSetError(c,REDIS_ERR_IO,NULL);
return REDIS_ERR;
}
} else if (nwritten > 0) {
if (nwritten == (signed)sdslen(c->obuf)) {
sdsfree(c->obuf);
c->obuf = sdsempty();
} else {
c->obuf = sdsrange(c->obuf,nwritten,-1);
}
}
}
if (done != NULL) *done = (sdslen(c->obuf) == 0);
return REDIS_OK;
}

Socket数据包读取函数:
  /* Use this function to handle a read event on the descriptor. It will try
* and read some bytes from the socket and feed them to the reply parser.
*
* After this function is called, you may use redisContextReadReply to
* see if there is a reply available. */
int redisBufferRead(redisContext *c) {
char buf[1024*16];
int nread;
/* Return early when the context has seen an error. */
if (c->err)
return REDIS_ERR;
nread = read(c->fd,buf,sizeof(buf));
if (nread == -1) {
if (errno == EAGAIN && !(c->flags & REDIS_BLOCK)) {
/* Try again later */
} else {
__redisSetError(c,REDIS_ERR_IO,NULL);
return REDIS_ERR;
}
} else if (nread == 0) {
__redisSetError(c,REDIS_ERR_EOF,&quot;Server closed the connection&quot;);
return REDIS_ERR;
} else {
if (redisReaderFeed(c->reader,buf,nread) != REDIS_OK) {
__redisSetError(c,c->reader->err,c->reader->errstr);
return REDIS_ERR;
}
}
return REDIS_OK;
}

读取buffer内容并存放至c->reader,看此外代码,貌&#20284;返回的数据包大小不会超过16K.
  
  

版权声明:本文为博主原创文章,未经博主允许不得转载。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-138390-1-1.html 上篇帖子: SSDB:一个高性能的支持丰富数据结构的 NoSQL 数据库, 用于替代 Redis 下篇帖子: Redis 的 C++开发包 使用例子
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表