|
// 启动后台子进程,执行 AOF 持久化操作。
// bgrewriteaofCommand(),startAppendOnly(),serverCron() 中会调用此函数
/* This is how rewriting of the append only file in background works:
*
* 1) The user calls BGREWRITEAOF
* 2) Redis calls this function, that forks():
* 2a) the child rewrite the append only file in a temp file.
* 2b) the parent accumulates differences in server.aof_rewrite_buf.
* 3) When the child finished '2a' exists.
* 4) The parent will trap the exit code, if it's OK, will append the
* data accumulated into server.aof_rewrite_buf into the temp file, and
* finally will rename(2) the temp file in the actual file name.
* The the new file is reopened as the new append only file. Profit!
*/
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;
// 已经有正在执行备份的子进程
if (server.aof_child_pid != -1) return REDIS_ERR;
start = ustime();
if ((childpid = fork()) == 0) {
char tmpfile[256];
// 子进程
/* Child */
// 关闭监听
closeListeningSockets(0);
// 设置进程> redisSetProcTitle("redis-aof-rewrite");
// 临时文件名
snprintf(tmpfile, 256, "temp-rewriteaof-bg-%d.aof", (int) getpid());
// 脏数据,其实就是子进程所消耗的内存大小
if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
// 获取脏数据大小
size_t private_dirty = zmalloc_get_private_dirty();
// 记录脏数据
if (private_dirty) {
redisLog(REDIS_NOTICE,
"AOF rewrite: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
/* Parent */
server.stat_fork_time = ustime()-start;
if (childpid == -1) {
redisLog(REDIS_WARNING,
"Can't rewrite append only file in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,
"Background append only file rewriting started by pid %d", childpid);
// AOF 已经开始执行,取消 AOF 计划
server.aof_rewrite_scheduled = 0;
// AOF 最近一次执行的起始时间
server.aof_rewrite_time_start = time(NULL);
// 子进程> server.aof_child_pid = childpid;
updateDictResizePolicy();
// 因为更新缓存都将写入文件,要强制产生选择数据集的指令 SELECT ,以防出现数据合并错误。
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.aof_rewrite_buf will start
* with a SELECT statement and it will be safe to merge.
*/
server.aof_selected_db = -1;
replicationScriptCacheFlush();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
// AOF 持久化主函数。只在 rewriteAppendOnlyFileBackground() 中会调用此函数
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
*
* In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command.
*/
int rewriteAppendOnlyFile(char *filename) {
dictIterator *di = NULL;
dictEntry *de;
rio aof;
FILE *fp;
char tmpfile[256];
int j;
long long now = mstime();
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function.
*/
snprintf(tmpfile, 256, "temp-rewriteaof-%d.aof", (int) getpid());
// 打开文件
fp = fopen(tmpfile, "w");
if (!fp) {
redisLog(REDIS_WARNING,
"Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s",
strerror(errno));
return REDIS_ERR;
}
// 初始化 rio 结构体
rioInitWithFile(&aof, fp);
// 如果设置了自动备份参数,将进行设置
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof, REDIS_AOF_AUTOSYNC_BYTES);
// 备份每一个数据集
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db + j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
// 获取数据集的迭代器
di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
}
// 写入 AOF 操作码
/* SELECT the new DB */
if (rioWrite(&aof, selectcmd,> // 写入数据集序号
if (rioWriteBulkLongLong(&aof, j) == 0) goto werr;
// 写入数据集中每一个数据项
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr;
robj key, *o;
long long expiretime;
keystr = dictGetKey(de);
o = dictGetVal(de);
// 将 keystr 封装在 robj 里
initStaticStringObject(key, keystr);
// 获取过期时间
expiretime = getExpire(db, &key);
// 如果已经过期,放弃存储
/* If this key is already expired skip it */
if (expiretime != -1 && expiretime < now) continue;
// 写入键值对应的写操作
/* Save the key and associated value */
if (o->type == REDIS_STRING) {
/* Emit a SET command */
char cmd[] = "*3\r\n$3\r\nSET\r\n";
if (rioWrite(&aof, cmd,> /* Key and value */
if (rioWriteBulkObject(&aof, &key) == 0) goto werr;
if (rioWriteBulkObject(&aof, o) == 0) goto werr;
} else if (o->type == REDIS_LIST) {
if (rewriteListObject(&aof, &key, o) == 0) goto werr;
} else if (o->type == REDIS_SET) {
if (rewriteSetObject(&aof, &key, o) == 0) goto werr;
} else if (o->type == REDIS_ZSET) {
if (rewriteSortedSetObject(&aof, &key, o) == 0) goto werr;
} else if (o->type == REDIS_HASH) {
if (rewriteHashObject(&aof, &key, o) == 0) goto werr;
} else {
redisPanic("Unknown object type");
}
// 写入过期时间
/* Save the expire time */
if (expiretime != -1) {
char cmd[] = "*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(&aof, cmd,> if (rioWriteBulkObject(&aof, &key) == 0) goto werr;
if (rioWriteBulkLongLong(&aof, expiretime) == 0) goto werr;
}
}
// 释放迭代器
dictReleaseIterator(di);
}
// 写入磁盘
/* Make sure data will not remain on the OS's output buffers */
fflush(fp);
aof_fsync(fileno(fp));
fclose(fp);
// 重写文件名
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile, filename) == -1) {
redisLog(REDIS_WARNING, "Error moving temp append only file on the final destination: %s",
strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
redisLog(REDIS_NOTICE, "SYNC append only file rewrite performed");
return REDIS_OK;
werr:
// 清理工作
fclose(fp);
unlink(tmpfile);
redisLog(REDIS_WARNING, "Write error writing append only file on disk: %s", strerror(errno));
if (di) dictReleaseIterator(di);
return REDIS_ERR;
}
// 后台子进程结束后,redis 更新缓存 server.aof_rewrite_buf_blocks 追加到 AOF 文件中
// 在 AOF 持久化结束后会执行这个函数,backgroundRewriteDoneHandler()
// 主要工作是将 server.aof_rewrite_buf_blocks,即 AOF 缓存写入文件
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
* Handle this.
*/
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
......
// 将 AOF 缓存 server.aof_rewrite_buf_blocks 的 AOF 写入磁盘
if (aofRewriteBufferWrite(newfd) == -1) {
redisLog(REDIS_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
close(newfd);
goto cleanup;
}
......
}
// 将累积的更新缓存 server.aof_rewrite_buf_blocks 同步到磁盘
/* Write the buffer (possibly composed of multiple blocks) into the specified
* fd. If no short write or any other error happens -1 is returned,
* otherwise the number of bytes written is returned.
*/
ssize_t aofRewriteBufferWrite(int fd) {
listNode *ln;
listIter li;
ssize_t count = 0;
listRewind(server.aof_rewrite_buf_blocks, &li);
while((ln = listNext(&li))) {
aofrwblock *block = listNodeValue(ln);
ssize_t nwritten;
if (block->used) {
nwritten = write(fd, block->buf, block->used);
if (nwritten != block->used) {
if (nwritten == 0) errno = EIO;
return -1;
}
count += nwritten;
}
}
return count;
}
|
|