设为首页 收藏本站
查看: 8965|回复: 0

[经验分享] Redis Cluster 的实现 - 加入集群节点

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2014-10-8 15:05:18 | 显示全部楼层 |阅读模式
对于集群初始化,在 redis cluster 形成之前,各个节点都是独立的,它们主要是通过节点之间的 CLUSTER MEET 命令来初始化各个节点中的 clusterState 中的 nodes 成员,并构建最终的 cluster,cluster meet 的命令格式如下:
  

1
  CLUSTER MEET <ip> <port>



  
  下面以 NodeA 和 NodeB 两个节点逐步组成集群为例,客户端登录到 NodeA,并执行 MEET 命令将 NodeB 节点加入到 NodeA 的集群中。
  
  1) 接收到客户端发起的 MEET 命令的节点A(NodeA),会向 <ip>:<port> 的节点B(NodeB) 发起 Handshake 流程,如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void clusterCommand(redisClient *c) {
    if (server.cluster_enabled == 0) {
        addReplyError(c,"This instance has cluster support disabled");
        return;
    }
     
    if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
        // 接收到从 client 执行的 cluster meet <ip> <port> 命令
        long long port;
         
        // 获取 NodeB 的 Port(注:非 cluster bus 的监听端口)
        if (getLongLongFromObject(c->argv[3], &port) != REDIS_OK) {
            addReplyErrorFormat(c,"Invalid TCP port specified: %s",
                                (char*)c->argv[3]->ptr);
            return;
        }
         
        // 开始发起 handshake 流程
        if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 &&
            errno == EINVAL)
        {
            addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
                            (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
        } else {
            addReply(c,shared.ok);
        }
    }
     
    ... ...
}




  从上面代码可以看出发起 handshake 流程是在 clusterStartHandshake 函数中实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int clusterStartHandshake(char *ip, int port) {
    clusterNode *n;
    char norm_ip[REDIS_IP_STR_LEN];
    struct sockaddr_storage sa;
    /* IP 和 端口 检查 */
    ... ...
    /* IP 地址 normalize */
    ... ...
     
    /* 将节点 NodeB 加入 Handshake 阶段 */
    if (clusterHandshakeInProgress(norm_ip,port)) {
        errno = EAGAIN;
        return 0;
    }
    /* Add the node with a random address (NULL as first argument to
     * createClusterNode()). Everything will be fixed during the
     * handskake. */
    // 创建新的节点 NodeB,同时设置 flag 为 HANDSHAKE|MEET
    // 并添加至 NodeA 本地维护的 cluster nodes 表中
    n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
    memcpy(n->ip,norm_ip,sizeof(n->ip));
    n->port = port;
    clusterAddNode(n);
    return 1;
}




这里的代码并没有去发送 MEET 消息至 NodeB,而是在 clusterCron 中实现的,clusterCron 函数每秒被执行 10 次,如下(省略了处理心跳等其他流程):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/* This is executed 10 times every second */
void clusterCron(void) {
    dictIterator *di;
    dictEntry *de;
    int update_state = 0;
    int orphaned_masters; /* How many masters there are without ok slaves. */
    int max_slaves; /* Max number of ok slaves for a single master. */
    int this_slaves; /* Number of ok slaves for our master (if we are slave). */
    mstime_t min_pong = 0, now = mstime();
    clusterNode *min_pong_node = NULL;
    ... ...
     
    /* Check if we have disconnected nodes and re-establish the connection. */
    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        ... ...
         
        if (node->link == NULL) {
            // 对于首次握手,节点之间是没有链路的
            int fd;
            mstime_t old_ping_sent;
            clusterLink *link;
            
            // 创建一个新的 NodeA 和 NodeB 的 cluster bus 链路
            // 对端端口为 node->port + 10000
            // 并设置此链路的 READ 事件处理器为 clusterReadHandler
            fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
                node->port+REDIS_CLUSTER_PORT_INCR, REDIS_BIND_ADDR);
            link = createClusterLink(node);
            link->fd = fd;
            node->link = link;
            aeCreateFileEvent(server.el,link->fd,AE_READABLE,
                    clusterReadHandler,link);
                     
            /* Queue a PING in the new connection ASAP: this is crucial
             * to avoid false positives in failure detection.
             *
             * If the node is flagged as MEET, we send a MEET message instead
             * of a PING one, to force the receiver to add us in its node
             * table. */
            // 对于 NodeA 和 NodeB 之间的首次连接,flag 设置的为 MEET
            // 这里向 NodeB 发送 MEET 消息
            old_ping_sent = node->ping_sent;
            clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
                    CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
            if (old_ping_sent) {
                /* If there was an active ping before the link was
                 * disconnected, we want to restore the ping time, otherwise
                 * replaced by the clusterSendPing() call. */
                node->ping_sent = old_ping_sent;
            }
            
            /* We can clear the flag after the first packet is sent.
             * If we'll never receive a PONG, we'll never send new packets
             * to this node. Instead after the PONG is received and we
             * are no longer in meet/handshake status, we want to send
             * normal PING packets. */
            node->flags &= ~REDIS_NODE_MEET;
            redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",
                    node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
        }
    }
    dictReleaseIterator(di);
    ... ...
}




  2) 而接收到此 握手请求 MEET 消息的节点 NodeB,首先将 NodeA 加入到 NodeB 维护的 cluster nodes 表中,并返回 PONG 消息,但是此时 NodeB 中维护的 NodeA 信息的 flag 仍然为 HANDSHAKE 阶段(因为 NodeB 此时并不确认 NodeA 已经接收到 PONG 响应),如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
        redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node);
         
        /* We use incoming MEET messages in order to set the address
         * for 'myself', since only other cluster nodes will send us
         * MEET messagses on handshakes, when the cluster joins, or
         * later if we changed address, and those nodes will use our
         * official address to connect to us. So by obtaining this address
         * from the socket is a simple way to discover / update our own
         * address in the cluster without it being hardcoded in the config. */
        if (type == CLUSTERMSG_TYPE_MEET) {
            char ip[REDIS_IP_STR_LEN];
            
            // 通过外部节点连接的本节点地址来更新本节点对外的 IP 地址
            // 这里的 myself 为全局引用,最终指向 server.cluster->myself
            // 代表本节点自身
            if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 &&
                strcmp(ip,myself->ip))
            {
                memcpy(myself->ip,ip,REDIS_IP_STR_LEN);
                redisLog(REDIS_WARNING,"IP address for this node updated to %s",
                    myself->ip);
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
            }
        }
        /* Add this node if it is new for us and the msg type is MEET.
         * In this stage we don't try to add the node with the right
         * flags, slaveof pointer, and so forth, as this details will be
         * resolved when we'll receive PONGs from the node. */
        if (!sender && type == CLUSTERMSG_TYPE_MEET) {
            clusterNode *node;
            
            // 创建新的 cluster 节点
            // 且 flags 设置为 REDIS_NODE_HANDSHAKE
            // 表示当前仍然处于 握手阶段
            // 需要等接收到 PONG 消息后才将此节点的
            node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
            nodeIp2String(node->ip,link);
            node->port = ntohs(hdr->port);
            clusterAddNode(node);
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
        }
        /* Get info from the gossip section */
        clusterProcessGossipSection(hdr,link);
        /* Anyway reply with a PONG */
        clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
    }




  3) NodeA 在接收到 NodeB 返回的 PONG 消息后,主要更新本节点的集群配置中维护的 NodeB 的配置信息,如下代码:
   

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    /* PING or PONG: process config information. */
    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
        type == CLUSTERMSG_TYPE_MEET)
    {
        if (link->node) {
            if (nodeInHandshake(link->node)) {
                /* If we already have this node, try to change the
                 * IP/port of the node with the new one. */
                // 如果之前已经有过连接,则会释放该链路
                if (sender) {
                    if (nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port)))
                    {
                        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                             CLUSTER_TODO_UPDATE_STATE);
                    }
                     
                    /* Free this node as we already have it. This will
                     * cause the link to be freed as well. */
                    freeClusterNode(link->node);
                    return 0;
                }
                /* First thing to do is replacing the random name with the
                 * right node name if this was a handshake stage. */
                // 节点重命名,因为之前对于 NodeB 的 name 是采用的随机命名(clusterCreateNode)
                // 这里以 NodeB 过来的实际名称为准
                clusterRenameNode(link->node, hdr->sender);
                link->node->flags &= ~REDIS_NODE_HANDSHAKE; // 不在处于 HANDSHAKE 状态
                link->node->flags |= flags&(REDIS_NODE_MASTER|REDIS_NODE_SLAVE);
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
            }
            ... ...
        }
        ... ...
    }




4) 到这一步 NodeA 已经确认 NodeB 加入了集群,但是 NodeB 此时还是将 NodeA 的连接置于 HANDSHAKE 阶段的,NodeB 需要等待 NodeA 的下一条 PING 消息才会确认 NodeA 已经将自己加入了集群,并将本节点的 cluster nodes 中的 NodeA 的连接 flags 中移除 REDIS_NODE_HANDSHAKE,如上代码;


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-25672-1-1.html 上篇帖子: Redis开机启动脚本 Centos6.5 下篇帖子: redis修改持久化路径、日志路径、清缓存
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表