xiu12 发表于 2018-11-7 08:32:42

深入剖析Redis AOF持久化策略

// 启动后台子进程,执行 AOF 持久化操作。  
// bgrewriteaofCommand(),startAppendOnly(),serverCron() 中会调用此函数
  
/* This is how rewriting of the append only file in background works:
  
*
  
* 1) The user calls BGREWRITEAOF
  
* 2) Redis calls this function, that forks():
  
*    2a) the child rewrite the append only file in a temp file.
  
*    2b) the parent accumulates differences in server.aof_rewrite_buf.
  
* 3) When the child finished '2a' exists.
  
* 4) The parent will trap the exit code, if it's OK, will append the
  
*    data accumulated into server.aof_rewrite_buf into the temp file, and
  
*    finally will rename(2) the temp file in the actual file name.
  
*    The the new file is reopened as the new append only file. Profit!
  
*/
  
int rewriteAppendOnlyFileBackground(void) {
  pid_t childpid;
  long long start;
  // 已经有正在执行备份的子进程
  if (server.aof_child_pid != -1) return REDIS_ERR;
  start = ustime();
  if ((childpid = fork()) == 0) {
  char tmpfile;
  // 子进程
  /* Child */
  // 关闭监听
  closeListeningSockets(0);

  // 设置进程>  redisSetProcTitle("redis-aof-rewrite");
  // 临时文件名
  snprintf(tmpfile, 256, "temp-rewriteaof-bg-%d.aof", (int) getpid());
  // 脏数据,其实就是子进程所消耗的内存大小
  if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
  // 获取脏数据大小
  size_t private_dirty = zmalloc_get_private_dirty();
  // 记录脏数据
  if (private_dirty) {
  redisLog(REDIS_NOTICE,
  "AOF rewrite: %zu MB of memory used by copy-on-write",
  private_dirty/(1024*1024));
  }
  exitFromChild(0);
  } else {
  exitFromChild(1);
  }
  } else {
  /* Parent */
  server.stat_fork_time = ustime()-start;
  if (childpid == -1) {
  redisLog(REDIS_WARNING,
  "Can't rewrite append only file in background: fork: %s",
  strerror(errno));
  return REDIS_ERR;
  }
  redisLog(REDIS_NOTICE,
  "Background append only file rewriting started by pid %d", childpid);
  // AOF 已经开始执行,取消 AOF 计划
  server.aof_rewrite_scheduled = 0;
  // AOF 最近一次执行的起始时间
  server.aof_rewrite_time_start = time(NULL);

  // 子进程>  server.aof_child_pid = childpid;
  updateDictResizePolicy();
  // 因为更新缓存都将写入文件,要强制产生选择数据集的指令 SELECT ,以防出现数据合并错误。
  /* We set appendseldb to -1 in order to force the next call to the
  * feedAppendOnlyFile() to issue a SELECT command, so the differences
  * accumulated by the parent into server.aof_rewrite_buf will start
  * with a SELECT statement and it will be safe to merge.
  */
  server.aof_selected_db = -1;
  replicationScriptCacheFlush();
  return REDIS_OK;
  }
  return REDIS_OK; /* unreached */
  
}
  
// AOF 持久化主函数。只在 rewriteAppendOnlyFileBackground() 中会调用此函数
  
/* Write a sequence of commands able to fully rebuild the dataset into
  
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
  
*
  
* In order to minimize the number of commands needed in the rewritten
  
* log Redis uses variadic commands when possible, such as RPUSH, SADD
  
* and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time
  
* are inserted using a single command.
  
*/
  
int rewriteAppendOnlyFile(char *filename) {
  dictIterator *di = NULL;
  dictEntry *de;
  rio aof;
  FILE *fp;
  char tmpfile;
  int j;
  long long now = mstime();
  /* Note that we have to use a different temp name here compared to the
  * one used by rewriteAppendOnlyFileBackground() function.
  */
  snprintf(tmpfile, 256, "temp-rewriteaof-%d.aof", (int) getpid());
  // 打开文件
  fp = fopen(tmpfile, "w");
  if (!fp) {
  redisLog(REDIS_WARNING,
  "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s",
  strerror(errno));
  return REDIS_ERR;
  }
  // 初始化 rio 结构体
  rioInitWithFile(&aof, fp);
  // 如果设置了自动备份参数,将进行设置
  if (server.aof_rewrite_incremental_fsync)
  rioSetAutoSync(&aof, REDIS_AOF_AUTOSYNC_BYTES);
  // 备份每一个数据集
  for (j = 0; j < server.dbnum; j++) {
  char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
  redisDb *db = server.db + j;
  dict *d = db->dict;
  if (dictSize(d) == 0) continue;
  // 获取数据集的迭代器
  di = dictGetSafeIterator(d);
  if (!di) {
  fclose(fp);
  return REDIS_ERR;
  }
  // 写入 AOF 操作码
  /* SELECT the new DB */

  if (rioWrite(&aof, selectcmd,>  // 写入数据集序号
  if (rioWriteBulkLongLong(&aof, j) == 0) goto werr;
  // 写入数据集中每一个数据项
  /* Iterate this DB writing every entry */
  while((de = dictNext(di)) != NULL) {
  sds keystr;
  robj key, *o;
  long long expiretime;
  keystr = dictGetKey(de);
  o = dictGetVal(de);
  // 将 keystr 封装在 robj 里
  initStaticStringObject(key, keystr);
  // 获取过期时间
  expiretime = getExpire(db, &key);
  // 如果已经过期,放弃存储
  /* If this key is already expired skip it */
  if (expiretime != -1 && expiretime < now) continue;
  // 写入键值对应的写操作
  /* Save the key and associated value */
  if (o->type == REDIS_STRING) {
  /* Emit a SET command */
  char cmd[] = "*3\r\n$3\r\nSET\r\n";

  if (rioWrite(&aof, cmd,>  /* Key and value */
  if (rioWriteBulkObject(&aof, &key) == 0) goto werr;
  if (rioWriteBulkObject(&aof, o) == 0) goto werr;
  } else if (o->type == REDIS_LIST) {
  if (rewriteListObject(&aof, &key, o) == 0) goto werr;
  } else if (o->type == REDIS_SET) {
  if (rewriteSetObject(&aof, &key, o) == 0) goto werr;
  } else if (o->type == REDIS_ZSET) {
  if (rewriteSortedSetObject(&aof, &key, o) == 0) goto werr;
  } else if (o->type == REDIS_HASH) {
  if (rewriteHashObject(&aof, &key, o) == 0) goto werr;
  } else {
  redisPanic("Unknown object type");
  }
  // 写入过期时间
  /* Save the expire time */
  if (expiretime != -1) {
  char cmd[] = "*3\r\n$9\r\nPEXPIREAT\r\n";

  if (rioWrite(&aof, cmd,>  if (rioWriteBulkObject(&aof, &key) == 0) goto werr;
  if (rioWriteBulkLongLong(&aof, expiretime) == 0) goto werr;
  }
  }
  // 释放迭代器
  dictReleaseIterator(di);
  }
  // 写入磁盘
  /* Make sure data will not remain on the OS's output buffers */
  fflush(fp);
  aof_fsync(fileno(fp));
  fclose(fp);
  // 重写文件名
  /* Use RENAME to make sure the DB file is changed atomically only
  * if the generate DB file is ok. */
  if (rename(tmpfile, filename) == -1) {
  redisLog(REDIS_WARNING, "Error moving temp append only file on the final destination: %s",
  strerror(errno));
  unlink(tmpfile);
  return REDIS_ERR;
  }
  redisLog(REDIS_NOTICE, "SYNC append only file rewrite performed");
  return REDIS_OK;
  
werr:
  // 清理工作
  fclose(fp);
  unlink(tmpfile);
  redisLog(REDIS_WARNING, "Write error writing append only file on disk: %s", strerror(errno));
  if (di) dictReleaseIterator(di);
  return REDIS_ERR;
  
}
  
// 后台子进程结束后,redis 更新缓存 server.aof_rewrite_buf_blocks 追加到 AOF 文件中
  
// 在 AOF 持久化结束后会执行这个函数,backgroundRewriteDoneHandler()
  
// 主要工作是将 server.aof_rewrite_buf_blocks,即 AOF 缓存写入文件
  
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
  * Handle this.
  */
  
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
  ......
  // 将 AOF 缓存 server.aof_rewrite_buf_blocks 的 AOF 写入磁盘
  if (aofRewriteBufferWrite(newfd) == -1) {
  redisLog(REDIS_WARNING,
  "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
  close(newfd);
  goto cleanup;
  }
  ......
  
}
  
// 将累积的更新缓存 server.aof_rewrite_buf_blocks 同步到磁盘
  
/* Write the buffer (possibly composed of multiple blocks) into the specified
  * fd. If no short write or any other error happens -1 is returned,
  * otherwise the number of bytes written is returned.
  */
  
ssize_t aofRewriteBufferWrite(int fd) {
  listNode *ln;
  listIter li;
  ssize_t count = 0;
  listRewind(server.aof_rewrite_buf_blocks, &li);
  while((ln = listNext(&li))) {
  aofrwblock *block = listNodeValue(ln);
  ssize_t nwritten;
  if (block->used) {
  nwritten = write(fd, block->buf, block->used);
  if (nwritten != block->used) {
  if (nwritten == 0) errno = EIO;
  return -1;
  }
  count += nwritten;
  }
  }
  return count;
  
}


页: [1]
查看完整版本: 深入剖析Redis AOF持久化策略