|
// 连接主机 connectWithMaster() 的时候,会被注册为回调函数
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err;
int dfd, maxtries = 5;
int sockerr = 0, psync_result;
socklen_t errlen => ......
// 尝试部分同步,主机允许进行部分同步会返回 +CONTINUE,从机接收后注册相应的事件
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master run> * and the global offset, to try a partial resync at the next
* reconnection attempt. */
// 函数返回三种状态:
// PSYNC_CONTINUE:表示会进行部分同步,在 slaveTryPartialResynchronization()
// 中已经设置回调函数 readQueryFromClient()
// PSYNC_FULLRESYNC:全同步,会下载 RDB 文件
// PSYNC_NOT_SUPPORTED:未知
psync_result = slaveTryPartialResynchronization(fd);
if (psync_result == PSYNC_CONTINUE) {
redisLog(REDIS_NOTICE,
"MASTER SLAVE sync: Master accepted a Partial Resynchronization.");
return;
}
// 执行全同步
......
}
// 函数返回三种状态:
// PSYNC_CONTINUE:表示会进行部分同步,已经设置回调函数
// PSYNC_FULLRESYNC:全同步,会下载 RDB 文件
// PSYNC_NOT_SUPPORTED:未知
#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {
char *psync_runid;
char psync_offset[32];
sds reply;
/* Initially set repl_master_initial_offset to -1 to mark the current
* master run_id and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
server.repl_master_initial_offset = -1;
if (server.cached_master) {
// 缓存了上一次与主机连接的信息,可以尝试进行部分同步,减少数据传输
psync_runid = server.cached_master->replrunid;
snprintf(psync_offset,> server.cached_master->reploff + 1);
redisLog(REDIS_NOTICE,
"Trying a partial resynchronization (request %s:%s).",
psync_runid, psync_offset);
} else {
// 未缓存上一次与主机连接的信息,进行全同步
// psync ? -1 可以获取主机的 master_runid
redisLog(REDIS_NOTICE, "Partial resynchronization not possible (no cached master)");
psync_runid = "?";
memcpy(psync_offset, "-1", 3);
}
// 向主机发送命令,并接收回复
/* Issue the PSYNC command */
reply = sendSynchronousCommand(fd, "PSYNC", psync_runid, psync_offset, NULL);
// 全同步
if (!strncmp(reply, "+FULLRESYNC", 11)) {
char *runid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the run> * and the replication offset. */
runid = strchr(reply, ' ');
if (runid) {
runid++;
offset = strchr(runid, ' ');
if (offset) offset++;
}
if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
redisLog(REDIS_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* runid to make sure next PSYNCs will fail. */
memset(server.repl_master_runid, 0, REDIS_RUN_ID_SIZE + 1);
} else {
// 拷贝 runid
memcpy(server.repl_master_runid, runid, offset-runid-1);
server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
server.repl_master_initial_offset = strtoll(offset,NULL,10);
redisLog(REDIS_NOTICE, "Full resync from master: %s:%lld",
server.repl_master_runid,
server.repl_master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
// 部分同步
if (!strncmp(reply, "+CONTINUE", 9)) {
/* Partial resync was accepted, set the replication state accordingly */
redisLog(REDIS_NOTICE, "Successful partial resynchronization with master.");
sdsfree(reply);
// 缓存主机替代现有主机,且为 PSYNC(部分同步) 做好准备c
replicationResurrectCachedMaster(fd);
return PSYNC_CONTINUE;
}
/* If we reach this point we receied either an error since the master does
* not understand PSYNC, or an unexpected reply from the master.
* Reply with PSYNC_NOT_SUPPORTED in both cases. */
// 接收到主机发出的错误信息
if (strncmp(reply, "-ERR", 4)) {
/* If it's not an error, log the unexpected event. */
redisLog(REDIS_WARNING, "Unexpected reply to PSYNC from master: %s", reply);
} else {
redisLog(REDIS_NOTICE,
"Master does not support PSYNC or is in "
"error state (reply: %s)", reply);
}
sdsfree(reply);
replicationDiscardCachedMaster();
return PSYNC_NOT_SUPPORTED;
}
// 主机 SYNC 和 PSYNC 命令处理函数,会尝试进行部分同步和全同步
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
......
// 主机尝试部分同步,允许则进行部分同步,会返回 +CONTINUE,接着发送积压空间
/* Try a partial resynchronization if this is a PSYNC command.
* If it fails, we continue with usual full resynchronization, however
* when this happens masterTryPartialResynchronization() already
* replied with:
*
* +FULLRESYNC
*
* So the slave knows the new runid and offset to try a PSYNC later
* if the connection with the master is lost. */
if (!strcasecmp(c->argv[0]->ptr, "psync")) {
// 部分同步
if (masterTryPartialResynchronization(c) == REDIS_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
// 部分同步失败,会进行全同步,这时会收到来自客户端的 runid
char *master_runid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
* runid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_runid[0] != '?')
server.stat_sync_partial_err++;
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
* of the replication protocol (like redis-cli --slave). Flag the client
* so that we don't expect to receive REPLCONF ACK feedbacks. */
c->flags |= REDIS_PRE_PSYNC_SLAVE;
}
// 执行全同步:
......
}
// 主机尝试是否能进行部分同步
/* This function handles the PSYNC command from the point of view of a
* master receiving a request for partial resynchronization.
*
* On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
* with the usual full resync. */
int masterTryPartialResynchronization(redisClient *c) {
long long psync_offset, psync_len;
char *master_runid = c->argv[1]->ptr;
char buf[128];
int buflen;
/* Is the runid of this master the same advertised by the wannabe slave
* via PSYNC? If runid changed this master is a different instance and
* there is no way to continue. */
if (strcasecmp(master_runid, server.runid)) {
// 当因为异常需要与主机断开连接的时候,从机会暂存主机的状态信息,以便
// 下一次的部分同步。
// 1)master_runid 是从机提供一个因缓存主机的 runid,
// 2)server.runid 是本机(主机)的 runid。
// 匹配失败,说明是本机(主机)不是从机缓存的主机,这时候不能进行部分同步,
// 只能进行全同步
// "?" 表示从机要求全同步
// 什么时候从机会要求全同步???
/* Run> if (master_runid[0] != '?') {
redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
"Runid mismatch (Client asked for '%s', I'm '%s')",
master_runid, server.runid);
} else {
redisLog(REDIS_NOTICE, "Full resync requested by slave.");
}
goto need_full_resync;
}
// 从参数中解析整数,整数是从机指定的偏移量
/* We still have the data our slave is asking for? */
if (getLongLongFromObjectOrReply(c, c->argv[2], &psync_offset, NULL) != REDIS_OK)
goto need_full_resync;
// 部分同步失败的情况
if (!server.repl_backlog || /*不存在积压空间*/
psync_offset < server.repl_backlog_off || /*psync_offset 太过小,
即从机错过太多更新记录,
安全起见,实行全同步*/
/*psync_offset 越界*/
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
// 经检测,不满足部分同步的条件,转而进行全同步
{
redisLog(REDIS_NOTICE,
"Unable to partial resync with the slave for lack of
backlog (Slave request was: %lld).", psync_offset);
if (psync_offset > server.master_repl_offset) {
redisLog(REDIS_WARNING,
"Warning: slave tried to PSYNC with an offset that is greater
than the master replication offset.");
}
goto need_full_resync;
}
// 执行部分同步:
// 1)标记客户端为从机
// 2)通知从机准备接收数据。从机收到 +CONTINUE 会做好准备
// 3)开发发送数据
/* If we reached this point, we are able to perform a partial resync:
* 1) Set client state to make it a slave.
* 2) Inform the client we can continue with +CONTINUE
* 3) Send the backlog data (from the offset to the end) to the slave. */
// 将连接的客户端标记为从机
c->flags |= REDIS_SLAVE;
// 表示进行部分同步
// #define REDIS_REPL_ONLINE 9 /* RDB file transmitted, sending just
// updates. */
c->replstate = REDIS_REPL_ONLINE;
// 更新 ack 的时间
c->repl_ack_time = server.unixtime;
// 添加入从机链表
listAddNodeTail(server.slaves, c);
// 告诉从机可以进行部分同步,从机收到后会做相关的准备(注册回调函数)
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
* emtpy so this write will never fail actually. */
buflen = snprintf(buf,> if (write(c->fd, buf, buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
// 向从机写积压空间中的数据,积压空间存储有「更新缓存」
psync_len = addReplyReplicationBacklog(c, psync_offset);
redisLog(REDIS_NOTICE,
"Partial resynchronization request accepted. Sending %lld bytes of backlog
starting from offset %lld.", psync_len, psync_offset);
/* Note that we don't need to set the selected DB at server.slaveseldb
* to -1 to force the master to emit SELECT, since the slave already
* has this state from the previous connection with the master. */
refreshGoodSlavesCount();
return REDIS_OK; /* The caller can return, no full resync needed. */
need_full_resync:
......
// 向从机发送 +FULLRESYNC runid repl_offset
}
|
|