|
对于集群初始化,在 redis cluster 形成之前,各个节点都是独立的,它们主要是通过节点之间的 CLUSTER MEET 命令来初始化各个节点中的 clusterState 中的 nodes 成员,并构建最终的 cluster,cluster meet 的命令格式如下:
1
| CLUSTER MEET <ip> <port>
|
下面以 NodeA 和 NodeB 两个节点逐步组成集群为例,客户端登录到 NodeA,并执行 MEET 命令将 NodeB 节点加入到 NodeA 的集群中。
1) 接收到客户端发起的 MEET 命令的节点A(NodeA),会向 <ip>:<port> 的节点B(NodeB) 发起 Handshake 流程,如下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| void clusterCommand(redisClient *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}
if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
// 接收到从 client 执行的 cluster meet <ip> <port> 命令
long long port;
// 获取 NodeB 的 Port(注:非 cluster bus 的监听端口)
if (getLongLongFromObject(c->argv[3], &port) != REDIS_OK) {
addReplyErrorFormat(c,"Invalid TCP port specified: %s",
(char*)c->argv[3]->ptr);
return;
}
// 开始发起 handshake 流程
if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 &&
errno == EINVAL)
{
addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
(char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
} else {
addReply(c,shared.ok);
}
}
... ...
}
|
从上面代码可以看出发起 handshake 流程是在 clusterStartHandshake 函数中实现的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| int clusterStartHandshake(char *ip, int port) {
clusterNode *n;
char norm_ip[REDIS_IP_STR_LEN];
struct sockaddr_storage sa;
/* IP 和 端口 检查 */
... ...
/* IP 地址 normalize */
... ...
/* 将节点 NodeB 加入 Handshake 阶段 */
if (clusterHandshakeInProgress(norm_ip,port)) {
errno = EAGAIN;
return 0;
}
/* Add the node with a random address (NULL as first argument to
* createClusterNode()). Everything will be fixed during the
* handskake. */
// 创建新的节点 NodeB,同时设置 flag 为 HANDSHAKE|MEET
// 并添加至 NodeA 本地维护的 cluster nodes 表中
n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
memcpy(n->ip,norm_ip,sizeof(n->ip));
n->port = port;
clusterAddNode(n);
return 1;
}
|
这里的代码并没有去发送 MEET 消息至 NodeB,而是在 clusterCron 中实现的,clusterCron 函数每秒被执行 10 次,如下(省略了处理心跳等其他流程):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
| /* This is executed 10 times every second */
void clusterCron(void) {
dictIterator *di;
dictEntry *de;
int update_state = 0;
int orphaned_masters; /* How many masters there are without ok slaves. */
int max_slaves; /* Max number of ok slaves for a single master. */
int this_slaves; /* Number of ok slaves for our master (if we are slave). */
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
... ...
/* Check if we have disconnected nodes and re-establish the connection. */
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
... ...
if (node->link == NULL) {
// 对于首次握手,节点之间是没有链路的
int fd;
mstime_t old_ping_sent;
clusterLink *link;
// 创建一个新的 NodeA 和 NodeB 的 cluster bus 链路
// 对端端口为 node->port + 10000
// 并设置此链路的 READ 事件处理器为 clusterReadHandler
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->port+REDIS_CLUSTER_PORT_INCR, REDIS_BIND_ADDR);
link = createClusterLink(node);
link->fd = fd;
node->link = link;
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
/* Queue a PING in the new connection ASAP: this is crucial
* to avoid false positives in failure detection.
*
* If the node is flagged as MEET, we send a MEET message instead
* of a PING one, to force the receiver to add us in its node
* table. */
// 对于 NodeA 和 NodeB 之间的首次连接,flag 设置的为 MEET
// 这里向 NodeB 发送 MEET 消息
old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
if (old_ping_sent) {
/* If there was an active ping before the link was
* disconnected, we want to restore the ping time, otherwise
* replaced by the clusterSendPing() call. */
node->ping_sent = old_ping_sent;
}
/* We can clear the flag after the first packet is sent.
* If we'll never receive a PONG, we'll never send new packets
* to this node. Instead after the PONG is received and we
* are no longer in meet/handshake status, we want to send
* normal PING packets. */
node->flags &= ~REDIS_NODE_MEET;
redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
}
}
dictReleaseIterator(di);
... ...
}
|
2) 而接收到此 握手请求 MEET 消息的节点 NodeB,首先将 NodeA 加入到 NodeB 维护的 cluster nodes 表中,并返回 PONG 消息,但是此时 NodeB 中维护的 NodeA 信息的 flag 仍然为 HANDSHAKE 阶段(因为 NodeB 此时并不确认 NodeA 已经接收到 PONG 响应),如下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node);
/* We use incoming MEET messages in order to set the address
* for 'myself', since only other cluster nodes will send us
* MEET messagses on handshakes, when the cluster joins, or
* later if we changed address, and those nodes will use our
* official address to connect to us. So by obtaining this address
* from the socket is a simple way to discover / update our own
* address in the cluster without it being hardcoded in the config. */
if (type == CLUSTERMSG_TYPE_MEET) {
char ip[REDIS_IP_STR_LEN];
// 通过外部节点连接的本节点地址来更新本节点对外的 IP 地址
// 这里的 myself 为全局引用,最终指向 server.cluster->myself
// 代表本节点自身
if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 &&
strcmp(ip,myself->ip))
{
memcpy(myself->ip,ip,REDIS_IP_STR_LEN);
redisLog(REDIS_WARNING,"IP address for this node updated to %s",
myself->ip);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
}
/* Add this node if it is new for us and the msg type is MEET.
* In this stage we don't try to add the node with the right
* flags, slaveof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node. */
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;
// 创建新的 cluster 节点
// 且 flags 设置为 REDIS_NODE_HANDSHAKE
// 表示当前仍然处于 握手阶段
// 需要等接收到 PONG 消息后才将此节点的
node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
nodeIp2String(node->ip,link);
node->port = ntohs(hdr->port);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
/* Get info from the gossip section */
clusterProcessGossipSection(hdr,link);
/* Anyway reply with a PONG */
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
}
|
3) NodeA 在接收到 NodeB 返回的 PONG 消息后,主要更新本节点的集群配置中维护的 NodeB 的配置信息,如下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| /* PING or PONG: process config information. */
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
if (link->node) {
if (nodeInHandshake(link->node)) {
/* If we already have this node, try to change the
* IP/port of the node with the new one. */
// 如果之前已经有过连接,则会释放该链路
if (sender) {
if (nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port)))
{
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
/* Free this node as we already have it. This will
* cause the link to be freed as well. */
freeClusterNode(link->node);
return 0;
}
/* First thing to do is replacing the random name with the
* right node name if this was a handshake stage. */
// 节点重命名,因为之前对于 NodeB 的 name 是采用的随机命名(clusterCreateNode)
// 这里以 NodeB 过来的实际名称为准
clusterRenameNode(link->node, hdr->sender);
link->node->flags &= ~REDIS_NODE_HANDSHAKE; // 不在处于 HANDSHAKE 状态
link->node->flags |= flags&(REDIS_NODE_MASTER|REDIS_NODE_SLAVE);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
... ...
}
... ...
}
|
4) 到这一步 NodeA 已经确认 NodeB 加入了集群,但是 NodeB 此时还是将 NodeA 的连接置于 HANDSHAKE 阶段的,NodeB 需要等待 NodeA 的下一条 PING 消息才会确认 NodeA 已经将自己加入了集群,并将本节点的 cluster nodes 中的 NodeA 的连接 flags 中移除 REDIS_NODE_HANDSHAKE,如上代码;
|
|