深入剖析Redis RDB持久化机制
本文分析源码基于 Redis 2.4.7 stable 版本。下面是其文章原文:rdb是 redis保存内存数据到磁盘数据的其中一种方式(另一种是AOF)。Rdb的主要原理就是在某个时间点把内存中的所有数据的快照保存一份到磁盘上。在条 件达到时通过fork一个子进程把内存中的数据写到一个临时文件中来实现保存数据快照。在所有数据写完后再把这个临时文件用原子函数rename(2)重 命名为目标rdb文件。这种实现方式充分利用fork的copy on write。
另外一种是通过save命令主动触发保存数据快照,这种是阻塞式的,即不会通过生成子进程来进行数据集快照的保存。
相关配置
save 经过多少秒且多少个key有改变就进行,可以配置多个,只要有一个满足就进行保存数据快照到磁盘
rdbcompression yes 保存数据到rdb文件时是否进行压缩,如果不想可以配置成’no’,默认是’yes’,因为压缩可以减少I/O,当然,压缩需要消耗一些cpu资源。
dbfilename dump.rdb 快照文件名
dir ./ 快照文件所在的目录,同时也是AOF文件所在的目录
Rdb文件格式
[注:本节所说的类型,值在没有特别标明的情况下都是针对rdb文件来说的]
Rdb文件的整体格式
文件签名 | 版本号 | 类型 | 值 | 类型 | 值 | … | 类型 | 值
[注:竖线和空格是为了便于阅读而加入的,rdb文件中是没有竖线和空格分隔的]
[*]文件签名是字符串:REDIS
[*]版本号是字符串:0002
[*]类型是指值的类型,redis值的类型有很多种,下边一一介绍
[*]值是对应的类型下的值,不同类型的值格式不一样。这里的值包含了redis中的key与val。而不是单指redis中val。
REDIS_SELECTDB类型与REDIS_EOF类型
[*]REDIS_SELECTDB类型:对应的值是redis db的编号,从0开始到比db数小1的数值。redis中可以配置db数,每个key只属于一个db。
[*]存储redis db的编号时使用的是存储长度时使用的格式,为了尽量压缩rdb文件,存储长度使用的字节数是不一样的,具体见下边rdb中长度的存储
[*]REDIS_EOF类型:没有对应的值。rdb文件的结束符。
把这REDIS_SELECTDB类型和REDIS_EOF类型代入到上边的rdb文件的格式中,那么rdb文件的整体格式变成为:
文件签名 | 版本号 | REDIS_SELECTDB类型 | db编号 | 类型 | 值 | … | REDIS_SELECTD 类型 | db编号 | 类型 | 值 | … | REDIS_EOF类型
[*]每个db编号后边到下一个REDIS_SELECTDB类型出现之前的数据都是该db下边的key和value的数据
相关代码
Rdb.c:394
int rdbSave(char *filename) { … fp = fopen(tmpfile,"w"); if (!fp) { redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno)); return REDIS_ERR; } if (fwrite("REDIS0002",9,1,fp) == 0) goto werr; for (j = 0; j < server.dbnum; j++) { … /* Write the SELECT DB opcode */ if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr; if (rdbSaveLen(fp,j) == -1) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { … initStaticStringObject(key,keystr); expiretime = getExpire(db,&key); /* Save the expire time */ if (expiretime != -1) { /* If this key is already expired skip it */ if (expiretime < now) continue; if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr; if (rdbSaveTime(fp,expiretime) == -1) goto werr; } /* Save the key and associated value. This requires special * handling if the value is swapped out. */ if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY || o->storage == REDIS_VM_SWAPPING) { int otype = getObjectSaveType(o); /* Save type, key, value */ if (rdbSaveType(fp,otype) == -1) goto werr; if (rdbSaveStringObject(fp,&key) == -1) goto werr; if (rdbSaveObject(fp,o) == -1) goto werr; } else { /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */ robj *po; /* Get a preview of the object in memory */ po = vmPreviewObject(o); /* Save type, key, value */ if (rdbSaveType(fp,getObjectSaveType(po)) == -1) goto werr; if (rdbSaveStringObject(fp,&key) == -1) goto werr; if (rdbSaveObject(fp,po) == -1) goto werr; /* Remove the loaded object from memory */ decrRefCount(po); } } dictReleaseIterator(di); } /* EOF opcode */ if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr; … } Rdb中长度的存储
Redis为了尽量压缩rdb文件真是费尽心思,先来看看redis为了压缩使用的长度存储。长度主要用在字符串长度,链表长度,hash表的大小存储上。
Redis把长度的存储分为四种,最左边字节的从左到右的前两位用于区分长度的存储类型。
类型位表示 类型整型表示 占用字节数 类型解析 00 0 1 当长度能用6位表示使用此类型 01 1 2 当长度不能用6位表示且能用14位表示使用此类型 10 2 5 当长度不能用14位表示且能用32位表示使用此类型 相关代码
Rdb.c:31
int rdbSaveLen(FILE *fp, uint32_t len) { unsigned char buf; int nwritten; if (len < (1 0) close(server.sofd); if (rdbSave(filename) == REDIS_OK) { _exit(0); } else { _exit(1); } } else { /* Parent */ server.stat_fork_time = ustime()-start; if (childpid == -1) { redisLog(REDIS_WARNING,"Can't save in background: fork: %s", strerror(errno)); return REDIS_ERR; } redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid); server.bgsavechildpid = childpid; updateDictResizePolicy(); return REDIS_OK; } return REDIS_OK; /* unreached */ }
[*]对是否已经有写rdb的子进程进行了判断,如果已经有保存快照的子进程,则返回错误。
[*]如果启动了虚拟内存,则等待所有处理换出换入的任务线程退出,如果还有vm任务在处理就会一直循环等待。一直到所有换入换出任务都完成且所有vm线程退出。
[*]保存当前的脏数据计数,当快照保存完后用于更新当前的脏数据计数(见函数backgroundSaveDoneHandler,rdb.c:1062)
[*]记下当前时间,用于统计fork一个进程需要的时间
[*]Fork一个字进程,子进程调用rdbSave进行快照保存
[*]父进程统计fork一个子进程消耗的时间: server.stat_fork_time = ustime()-start,这个统计可以通过info命令获得。
[*]保存子进程ID和更新增量重哈希的策略,即此时不应该再进行增量重哈希,不然大量key的改变可能导致fork的copy-on-write进行大量的写。
到了这里我们知道,rdb的快照保存是通过函数rdbSave函数(rdb.c:394)来实现的。其实save命令也是通过调用这个函数来实现的。我们来简单看看
Db.c:323
void saveCommand(redisClient *c) { if (server.bgsavechildpid != -1) { addReplyError(c,"Background save already in progress"); return; } if (rdbSave(server.dbfilename) == REDIS_OK) { addReply(c,shared.ok); } else { addReply(c,shared.err); } 最后我们进rdbSave函数看看
rdb.c:394
int rdbSave(char *filename) { ... /* Wait for I/O therads to terminate, just in case this is a * foreground-saving, to avoid seeking the swap file descriptor at the * same time. */ if (server.vm_enabled) waitEmptyIOJobsQueue(); snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno)); return REDIS_ERR; } if (fwrite("REDIS0002",9,1,fp) == 0) goto werr; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; di = dictGetSafeIterator(d); if (!di) { fclose(fp); return REDIS_ERR; } /* Write the SELECT DB opcode */ if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr; if (rdbSaveLen(fp,j) == -1) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { sds keystr = dictGetEntryKey(de); robj key, *o = dictGetEntryVal(de); time_t expiretime; initStaticStringObject(key,keystr); expiretime = getExpire(db,&key); /* Save the expire time */ if (expiretime != -1) { /* If this key is already expired skip it */ if (expiretime < now) continue; if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr; if (rdbSaveTime(fp,expiretime) == -1) goto werr; } /* Save the key and associated value. This requires special * handling if the value is swapped out. */ if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY || o->storage == REDIS_VM_SWAPPING) { int otype = getObjectSaveType(o); /* Save type, key, value */ if (rdbSaveType(fp,otype) == -1) goto werr; if (rdbSaveStringObject(fp,&key) == -1) goto werr; if (rdbSaveObject(fp,o) == -1) goto werr; } else { /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */ robj *po; /* Get a preview of the object in memory */ po = vmPreviewObject(o); /* Save type, key, value */ if (rdbSaveType(fp,getObjectSaveType(po)) == -1) goto werr; if (rdbSaveStringObject(fp,&key) == -1) goto werr; if (rdbSaveObject(fp,po) == -1) goto werr; /* Remove the loaded object from memory */ decrRefCount(po); } } dictReleaseIterator(di); } /* EOF opcode */ if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr; /* Make sure data will not remain on the OS's output buffers */ fflush(fp); fsync(fileno(fp)); fclose(fp); /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno)); unlink(tmpfile); return REDIS_ERR; } redisLog(REDIS_NOTICE,"DB saved on disk"); server.dirty = 0; server.lastsave = time(NULL); return REDIS_OK;werr: fclose(fp); unlink(tmpfile); redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno)); if (di) dictReleaseIterator(di); return REDIS_ERR; }
[*]对是否有vm线程进行再次判断,因为如果是通过save命令过来的是没有判断过vm线程的。
[*]创建并打开临时文件
[*]写入文件签名“REDIS”和版本号“0002”
[*]遍历所有db中的所有key
[*]对每个key,先判断是否设置了expireTime,如果设置了,则保存expireTime到rdb文件中。然后判断该key对应的value是否则内存中,如果是在内存中,则取出来写入到rdb文件中保 存,如果被换出到虚拟内存了,则从虚拟内存读取然后写入到rdb文件中。
[*]不同类型有有不同的存储格式,详细见rdb文件格式
[*]最后写入rdb文件的结束符
[*]关闭文件并重命名临时文件名到正式文件名
[*]更新脏数据计数server.dirty为0和最近写rdb文件的时间server.lastsave为当前时间,这个只是在通过save命令触发的情况下有用。因为如果是通过fork一个子进程来写rdb文件的,更新无效,因为更新的是子进程的数据。
如果是通过fork一个子进程来写rdb文件(即不是通过save命令触发的),在写rdb文件的过程中,可能又有一些数据被更改了,那此时的脏数据计数server.dirty怎么更新呢? redis是怎样处理的呢?
我们来看看写rdb的子进程推出时得处理
Redis.c:605
if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) { int statloc; pid_t pid; if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { if (pid == server.bgsavechildpid) { backgroundSaveDoneHandler(statloc); } else { backgroundRewriteDoneHandler(statloc); } updateDictResizePolicy(); } }
[*]如果捕捉到写rdb文件的子进程退出,则调用backgroundSaveDoneHandler进行处理
接着看看backgroundSaveDoneHandler函数
Rdb.c:1062
void backgroundSaveDoneHandler(int statloc) { int exitcode = WEXITSTATUS(statloc); int bysignal = WIFSIGNALED(statloc); if (!bysignal && exitcode == 0) { redisLog(REDIS_NOTICE, "Background saving terminated with success"); server.dirty = server.dirty - server.dirty_before_bgsave; server.lastsave = time(NULL); } else if (!bysignal && exitcode != 0) { redisLog(REDIS_WARNING, "Background saving error"); } else { redisLog(REDIS_WARNING, "Background saving terminated by signal %d", WTERMSIG(statloc)); rdbRemoveTempFile(server.bgsavechildpid); } server.bgsavechildpid = -1; /* Possibly there are slaves waiting for a BGSAVE in order to be served * (the first stage of SYNC is a bulk transfer of dump.rdb) */ updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR); }
[*]更新脏数据计数server.dirty为0和最近写rdb文件的时间server.lastsave为当前时间
[*]唤醒因为正在保存快照而等待的slave,关于slave的具体内容,见replication
快照导入
当redis因为停电或者某些原因挂掉了,此时重启redis时,我们就需要从rdb文件中读取快照文件,把保存到rdb文件中的数据重新导入到内存中。
先来看看启动时对快照导入的处理
Redis.c:1717
if (server.appendonly) { if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK) redisLog(REDIS_NOTICE,"DB loaded from append only file: %ld seconds",time(NULL)-start); } else { if (rdbLoad(server.dbfilename) == REDIS_OK) { redisLog(REDIS_NOTICE,"DB loaded from disk: %ld seconds", time(NULL)-start); } else if (errno != ENOENT) { redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting."); exit(1); } }
[*]如果保存了AOF文件,则使用AOF文件来恢复数据,AOF的具体内容见AOF
[*]如果没有AOF,则使用rdb文件恢复数据,调用rdbLoad函数
接着看看rdbLoad函数
Rdb.c:929
int rdbLoad(char *filename) { ... fp = fopen(filename,"r"); if (!fp) { errno = ENOENT; return REDIS_ERR; } if (fread(buf,9,1,fp) == 0) goto eoferr; buf = '\0'; if (memcmp(buf,"REDIS",5) != 0) { fclose(fp); redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file"); errno = EINVAL; return REDIS_ERR; } rdbver = atoi(buf+5); if (rdbver < 1 || rdbver > 2) { fclose(fp); redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver); errno = EINVAL; return REDIS_ERR; } startLoading(fp); while(1) { robj *key, *val; int force_swapout; expiretime = -1; /* Serve the clients from time to time */ if (!(loops++ % 1000)) { loadingProgress(ftello(fp)); aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); } /* Read type. */ if ((type = rdbLoadType(fp)) == -1) goto eoferr; if (type == REDIS_EXPIRETIME) { if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr; /* We read the time so we need to read the object type again */ if ((type = rdbLoadType(fp)) == -1) goto eoferr; } if (type == REDIS_EOF) break; /* Handle SELECT DB opcode as a special case */ if (type == REDIS_SELECTDB) { if ((dbid = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) goto eoferr; if (dbid >= (unsigned)server.dbnum) { redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum); exit(1); } db = server.db+dbid; continue; } /* Read key */ if ((key = rdbLoadStringObject(fp)) == NULL) goto eoferr; /* Read value */ if ((val = rdbLoadObject(type,fp)) == NULL) goto eoferr; /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the slave. */ if (server.masterhost == NULL && expiretime != -1 && expiretime < now) { decrRefCount(key); decrRefCount(val); continue; } /* Add the new object in the hash table */ dbAdd(db,key,val); /* Set the expire time if needed */ if (expiretime != -1) setExpire(db,key,expiretime); /* Handle swapping while loading big datasets when VM is on */ /* If we detecter we are hopeless about fitting something in memory * we just swap every new key on disk. Directly… * Note that's important to check for this condition before resorting * to random sampling, otherwise we may try to swap already * swapped keys. */ if (swap_all_values) { dictEntry *de = dictFind(db->dict,key->ptr); /* de may be NULL since the key already expired */ if (de) { vmpointer *vp; val = dictGetEntryVal(de); if (val->refcount == 1 && (vp = vmSwapObjectBlocking(val)) != NULL) dictGetEntryVal(de) = vp; } decrRefCount(key); continue; } decrRefCount(key); /* Flush data on disk once 32 MB of additional RAM are used… */ force_swapout = 0; if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32) force_swapout = 1; /* If we have still some hope of having some value fitting memory * then we try random sampling. */ if (!swap_all_values && server.vm_enabled && force_swapout) { while (zmalloc_used_memory() > server.vm_max_memory) { if (vmSwapOneObjectBlocking() == REDIS_ERR) break; } if (zmalloc_used_memory() > server.vm_max_memory) swap_all_values = 1; /* We are already using too much mem */ } } fclose(fp); stopLoading(); return REDIS_OK;eoferr: /* unexpected end of file is handled here with a fatal exit */ redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now."); exit(1); return REDIS_ERR; /* Just to avoid warning */ }
[*]打开rdb文件
[*]读取rdb文件的签名和版本号
[*]开始进入 类型 | 值 | 类型 | 值 的循环读取,可参考rdb文件格式
[*]作者还做了导入的进度条,是有人反馈说rdb文件很大时导入时要很久,但又不知道进度,所以作者就加了导入的进度条,改善用户体验
[*]读取类型
[*]如果类型是过期时间类型REDIS_EXPIRETIME,则读取过期时间
[*]如果类型是文件结束类型REDIS_EOF,则跳出 类型 | 值 | 类型 | 值 的循环读取
[*]如果类型是选择db类型REDIS_SELECTDB,则读取db索引并把当前db转成该db,然后继续 类型 | 值 | 类型 | 值 的循环读取。
[*]如果不是以上类型,则表明该类型是数据类型,读取作为key的字符串,即读取字符串类型的值,然后接着读取作为value的字符串。不同类型的编码不一样,根据写入时得规则解释读取到的值即可
[*]读取到key和value后,判断下该key是否过期,如果过期则丢弃,不再导入,然后继续 类型 | 值 | 类型 | 值 的循环读取。
[*]如果读取成功,则导入到内存,如果有过期时间则设置过期时间
[*]如果配置了虚拟内存并且内存的使用比虚拟内存配置的大32M时,开始随机的取一些数据换出到虚拟内存中。
[*]从上边我们也可以看到,如果没有配置虚拟内存,rdb文件导入时会尽可能地占用操作系统的内存,甚至可能全部用完。
总结
落地存储是数据设计的一大重点也是难点。原理很简单,定义某种协议,然后按照某种协议写入读出。Redis为了节省空间和读写时的I/O操作,做了 很多很细致的工作来压缩数据。另外redis的丰富的数据类型也加大了落地的实现难度。作者也曾经在他的博客说过,redis的丰富的数据类型导致了很多 经典的优化办法无法在redis上实现。
页:
[1]