/** cluster.c的clusterSendPing函数 **/
/* Send a PING or PONG packet to the specified node, making sure to add enough
* gossip informations. */
void clusterSendPing(clusterLink *link, int type) {
unsigned char *buf;
clusterMsg *hdr;
int gossipcount = 0; /* Number of gossip sections added so far. */
int wanted; /* Number of gossip sections we want to append if possible. */
int totlen; /* Total packet length. */
/* freshnodes is the max number of nodes we can hope to append at all:
* nodes available minus two (ourself and the node we are sending the
* message to). However practically there may be less valid nodes since
* nodes in handshake state, disconnected, are not considered. */
int freshnodes = dictSize(server.cluster->nodes)-2;
/* How many gossip sections we want to add? 1/10 of the number of nodes
* and anyway at least 3. Why 1/10?
*
* If we have N masters, with N/10 entries, and we consider that in
* node_timeout we exchange with each other node at least 4 packets
* (we ping in the worst case in node_timeout/2 time, and we also
* receive two pings from the host), we have a total of 8 packets
* in the node_timeout*2 falure reports validity time. So we have
* that, for a single PFAIL node, we can expect to receive the following
* number of failure reports (in the specified window of time):
*
* PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
*
* PROB = probability of being featured in a single gossip entry,
* which is 1 / NUM_OF_NODES.
* ENTRIES = 10.
* TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
*
* If we assume we have just masters (so num of nodes and num of masters
* is the same), with 1/10 we always get over the majority, and specifically
* 80% of the number of nodes, to account for many masters failing at the
* same time.
*
* Since we have non-voting slaves that lower the probability of an entry
* to feature our node, we set the number of entires per packet as
* 10% of the total nodes we have. */
wanted = floor(dictSize(server.cluster->nodes)/10);
if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes;
/* Compute the maxium totlen to allocate our buffer. We'll fix the totlen
* later according to the number of gossip sections we really were able
* to put inside the packet. */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*wanted);
/* Note: clusterBuildMessageHdr() expects the buffer to be always at least
* sizeof(clusterMsg) or more. */
if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
buf = zcalloc(totlen);
hdr = (clusterMsg*) buf;
/* Populate the header. */
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
clusterBuildMessageHdr(hdr,type);
/* Populate the gossip fields */
int maxiterations = wanted*3;
while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
clusterMsgDataGossip *gossip;
int j;
/* Don't include this node: the whole packet header is about us
* already, so we just gossip about other nodes. */
if (this == myself) continue;
/* Give a bias to FAIL/PFAIL nodes. */
if (maxiterations > wanted*2 &&
!(this->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL)))
continue;
/* In the gossip section don't include:
* 1) Nodes in HANDSHAKE state.
* 3) Nodes with the NOADDR flag set.
* 4) Disconnected nodes if they don't have configured slots.
*/
if (this->flags & (REDIS_NODE_HANDSHAKE|REDIS_NODE_NOADDR) ||
(this->link == NULL && this->numslots == 0))
{
freshnodes--; /* Tecnically not correct, but saves CPU. */
continue;
}
/* Check if we already added this node */
for (j = 0; j < gossipcount; j++) {
if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
REDIS_CLUSTER_NAMELEN) == 0) break;
}
if (j != gossipcount) continue;
/* Add it */
freshnodes--;
gossip = &(hdr->data.ping.gossip[gossipcount]);
memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
gossip->ping_sent = htonl(this->ping_sent);
gossip->pong_received = htonl(this->pong_received);
memcpy(gossip->ip,this->ip,sizeof(this->ip));
gossip->port = htons(this->port);
gossip->flags = htons(this->flags);
gossip->notused1 = 0;
gossip->notused2 = 0;
gossipcount++;
}
/* Ready to send... fix the totlen fiend and queue the message in the
* output buffer. */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
hdr->count = htons(gossipcount);
hdr->totlen = htonl(totlen);
clusterSendMessage(link,buf,totlen);
zfree(buf);
}
typedef struct {
/* REDIS_CLUSTER_NAMELEN是常量40 */
char nodename[REDIS_CLUSTER_NAMELEN];
uint32_t ping_sent;
uint32_t pong_received;
char ip[REDIS_IP_STR_LEN]; /* IP address last time it was seen */
uint16_t port; /* port last time it was seen */
uint16_t flags; /* node->flags copy */
uint16_t notused1; /* Some room for future improvements. */
uint32_t notused2;
} clusterMsgDataGossip;