史蒂夫和斯凯 发表于 2018-11-6 13:40:27

Redis Cluster 的实现 - 加入集群节点

/* This is executed 10 times every second */  
void clusterCron(void) {
  
    dictIterator *di;
  
    dictEntry *de;
  
    int update_state = 0;
  
    int orphaned_masters; /* How many masters there are without ok slaves. */
  
    int max_slaves; /* Max number of ok slaves for a single master. */
  
    int this_slaves; /* Number of ok slaves for our master (if we are slave). */
  
    mstime_t min_pong = 0, now = mstime();
  
    clusterNode *min_pong_node = NULL;
  
    ... ...
  

  
    /* Check if we have disconnected nodes and re-establish the connection. */
  
    di = dictGetSafeIterator(server.cluster->nodes);
  
    while((de = dictNext(di)) != NULL) {
  
      clusterNode *node = dictGetVal(de);
  
      ... ...
  

  
      if (node->link == NULL) {
  
            // 对于首次握手,节点之间是没有链路的
  
            int fd;
  
            mstime_t old_ping_sent;
  
            clusterLink *link;
  

  
            // 创建一个新的 NodeA 和 NodeB 的 cluster bus 链路
  
            // 对端端口为 node->port + 10000
  
            // 并设置此链路的 READ 事件处理器为 clusterReadHandler
  
            fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
  
                node->port+REDIS_CLUSTER_PORT_INCR, REDIS_BIND_ADDR);
  
            link = createClusterLink(node);
  
            link->fd = fd;
  
            node->link = link;
  
            aeCreateFileEvent(server.el,link->fd,AE_READABLE,
  
                  clusterReadHandler,link);
  

  
            /* Queue a PING in the new connection ASAP: this is crucial
  
             * to avoid false positives in failure detection.
  
             *
  
             * If the node is flagged as MEET, we send a MEET message instead
  
             * of a PING one, to force the receiver to add us in its node
  
             * table. */
  
            // 对于 NodeA 和 NodeB 之间的首次连接,flag 设置的为 MEET
  
            // 这里向 NodeB 发送 MEET 消息
  
            old_ping_sent = node->ping_sent;
  
            clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
  
                  CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
  
            if (old_ping_sent) {
  
                /* If there was an active ping before the link was
  
               * disconnected, we want to restore the ping time, otherwise
  
               * replaced by the clusterSendPing() call. */
  
                node->ping_sent = old_ping_sent;
  
            }
  

  
            /* We can clear the flag after the first packet is sent.
  
             * If we'll never receive a PONG, we'll never send new packets
  
             * to this node. Instead after the PONG is received and we
  
             * are no longer in meet/handshake status, we want to send
  
             * normal PING packets. */
  
            node->flags &= ~REDIS_NODE_MEET;
  
            redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",
  
                  node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
  
      }
  
    }
  
    dictReleaseIterator(di);
  
    ... ...
  
}


页: [1]
查看完整版本: Redis Cluster 的实现 - 加入集群节点