meimei10251314 发表于 2018-11-7 06:26:17

Redis Cluster 的实现 - cluster 消息的接收和分包

/* Read data. Try to read the first field of the header first to check the  
* full length of the packet. When a whole packet is in memory this function
  
* will call the function to process the packet. And so forth. */
  
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
  
    char buf;
  
    ssize_t nread;
  
    clusterMsg *hdr;
  
    clusterLink *link = (clusterLink*) privdata;
  
    int readlen, rcvbuflen;
  
    REDIS_NOTUSED(el);
  
    REDIS_NOTUSED(mask);
  
    while(1) { /* Read as long as there is data to read. */
  
      rcvbuflen = sdslen(link->rcvbuf);
  
      if (rcvbuflen < 8) {
  
            /* First, obtain the first 8 bytes to get the full message
  
             * length. */
  
            readlen = 8 - rcvbuflen;
  
      } else {
  
            // 已经知道了本条消息的长度
  
            // 本块代码主要计算剩余还需读入的字节数(readlen)才是完整的消息
  
            /* Finally read the full message. */
  
            hdr = (clusterMsg*) link->rcvbuf;
  
            if (rcvbuflen == 8) {
  
                /* Perform some sanity check on the message signature
  
               * and length. */
  
                if (memcmp(hdr->sig,"RCmb",4) != 0 ||
  
                  ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
  
                {
  
                  redisLog(REDIS_WARNING,
  
                        "Bad message length or signature received "
  
                        "from Cluster bus.");
  
                  handleLinkIOError(link);
  
                  return;
  
                }
  
            }
  
            readlen = ntohl(hdr->totlen) - rcvbuflen;
  
            if (readlen > sizeof(buf)) readlen = sizeof(buf);
  
      }
  

  
      // 读入本条消息记录的剩余 readlen 个字节的数据
  
      // 因为这里的 fd 是非阻塞的,所以需要判断 EAGAIN
  
      nread = read(fd,buf,readlen);
  
      if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */
  
      if (nread rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
  
            hdr = (clusterMsg*) link->rcvbuf;
  
            rcvbuflen += nread;
  
      }
  
      /* Total length obtained? Process this packet. */
  
      if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
  
            // 表明 link 上的 rcvbuf 已经是一个完整的 cluster 消息
  
            // 下面开始处理此消息
  
            if (clusterProcessPacket(link)) {
  
                sdsfree(link->rcvbuf);
  
                link->rcvbuf = sdsempty();
  
            } else {
  
                return; /* Link no longer valid. */
  
            }
  
      }
  
    }
  
}


页: [1]
查看完整版本: Redis Cluster 的实现 - cluster 消息的接收和分包