设为首页 收藏本站
查看: 1174|回复: 0

[经验分享] Redis源码分析(十九)--- replication主从数据复制的实现

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2014-11-5 09:20:34 | 显示全部楼层 |阅读模式
      replication的英文单词的原意是“复制”的意思,replication文件作为我在Data目录下的分析的最后一个文件,足以说明他的重要性,代码量1800+,的确非常难啃。只能说个我看代码下来的大致印象吧,要我画个结构图好好理理这里面各个API的关系图,这个我目前还真做不到。说到主从复制,这个是实现读写分离的最好手段了,也很常见,当用户数达到一定量,当一个服务器承受不了达到上千万的pv时,采取主从数据库的形式也是一般架构师能够想到的一种手段。Redis的主从数据库在我这里就称为主客户端,从客户端,因为客户端中有所属于的db,因为数据库基于客户单本身进行复制操作的。也就是说,一个Redis,存在一个master主客户端,多个slave从客户端,到时实现的就是slave向主客户端进行复制操作。因为API比较多,进行了稍稍的归类:



    /* ---------------------------------- MASTER -------------------------------- */  
    void createReplicationBacklog(void) /* 创建backlog的buffer */  
    void resizeReplicationBacklog(long long newsize) /* 调整复制备份日志的大小,当replication backlog被修改的时候 */  
    void freeReplicationBacklog(void) /* 释放备份日志 */  
    void feedReplicationBacklog(void *ptr, size_t len) /* 往备份日志中添加添加数据操作,会引起master_repl_offset偏移量的增加 */  
    void feedReplicationBacklogWithObject(robj *o) /* 往backlog添加数据,以Redis 字符串对象作为参数 */  
    void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) /* 将主数据库复制到从数据库 */  
    void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) /* 发送数据给monitor监听者客户端 */  
    long long addReplyReplicationBacklog(redisClient *c, long long offset) /* slave从客户单添加备份日志 */  
    int masterTryPartialResynchronization(redisClient *c) /* 主数据库尝试分区同步 */  
    void syncCommand(redisClient *c) /* 同步命令函数 */  
    void replconfCommand(redisClient *c) /* 此函数用于从客户端进行配置复制进程中的执行参数设置 */  
    void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) /* 给slave客户端发送BULK数据 */  
    void updateSlavesWaitingBgsave(int bgsaveerr) /* 此方法将用于后台保存进程快结束时调用,更新slave从客户端 */  
         
    /* ----------------------------------- SLAVE -------------------------------- */  
    void replicationAbortSyncTransfer(void) /* 中止与master主数据的同步操作 */  
    void replicationSendNewlineToMaster(void) /* 从客户端发送空行给主客户端,破坏了原本的协议格式,避免让主客户端检测出从客户端超时的情况 */  
    void replicationEmptyDbCallback(void *privdata) /* 清空数据库后的回调方法,当老数据被刷新出去之后等待加载新数据的时候调用 */  
    void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) /* 从客户端读取同步的Sync的BULK数据 */  
    char *sendSynchronousCommand(int fd, ...) /* 从客户端发送给主客户端同步数据的命令,附上验证信息,和一些参数配置信息 */  
    int slaveTryPartialResynchronization(int fd) /* 从客户端尝试分区同步操作 */  
    void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) /* 与主客户端保持同步,期间包括端口号等的确认,socket连接 */  
    int connectWithMaster(void) /* 连接主客户端 */  
    void undoConnectWithMaster(void) /* 撤销连接主客户端 */  
    int cancelReplicationHandshake(void) /* 当已经存在一个复制进程时,中止一个非阻塞的replication复制的尝试 */  
    void replicationSetMaster(char *ip, int port) /* 设定主客户端的ip地址和端口号 */  
    void replicationUnsetMaster(void)  
    void slaveofCommand(redisClient *c)  
    void roleCommand(redisClient *c)  
    void replicationSendAck(void) /* 发送ACK包给主客户端 ,告知当前的进程偏移量 */  
         
    /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */  
    void replicationCacheMaster(redisClient *c) /* 缓存客户端信息 */  
    void replicationDiscardCachedMaster(void) /* 当某个客户端将不会再回复的时候,可以释放掉缓存的主客户端 */  
    void replicationResurrectCachedMaster(int newfd) /* 将缓存客户端复活 */  
         
    /* ------------------------- MIN-SLAVES-TO-WRITE  --------------------------- */  
    void refreshGoodSlavesCount(void) /* 更新slave从客户端数量 */  
    void replicationScriptCacheInit(void)  
    void replicationScriptCacheFlush(void)  
    void replicationScriptCacheAdd(sds sha1)  
    int replicationScriptCacheExists(sds sha1)  
    void replicationCron(void)  

找一个标准的slave从客户端向主客户端实现同步的操作:



    /* 与主客户端保持同步,期间包括端口号等的确认,socket连接 */  
    void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {  
        char tmpfile[256], *err;  
        int dfd, maxtries = 5;  
        int sockerr = 0, psync_result;  
        socklen_t errlen = sizeof(sockerr);  
        REDIS_NOTUSED(el);  
        REDIS_NOTUSED(privdata);  
        REDIS_NOTUSED(mask);  
      
        /* If this event fired after the user turned the instance into a master
         * with SLAVEOF NO ONE we must just return ASAP. */  
        if (server.repl_state == REDIS_REPL_NONE) {  
            close(fd);  
            return;  
        }  
      
        /* Check for errors in the socket. */  
        /* socket连接是否正常 */  
        if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)  
            sockerr = errno;  
        if (sockerr) {  
            aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);  
            redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",  
                strerror(sockerr));  
            goto error;  
        }  
      
        /* If we were connecting, it's time to send a non blocking PING, we want to
         * make sure the master is able to reply before going into the actual
         * replication process where we have long timeouts in the order of
         * seconds (in the meantime the slave would block). */  
        /* 连接测试,将由主客户端发送PING命令给从客户端,在给定的延迟时间内观察是否有回复 */  
        if (server.repl_state == REDIS_REPL_CONNECTING) {  
            redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");  
            /* Delete the writable event so that the readable event remains
             * registered and we can wait for the PONG reply. */  
            aeDeleteFileEvent(server.el,fd,AE_WRITABLE);  
            server.repl_state = REDIS_REPL_RECEIVE_PONG;  
            /* Send the PING, don't check for errors at all, we have the timeout
             * that will take care about this. */  
            //发送PING命令  
            syncWrite(fd,"PING\r\n",6,100);  
            return;  
        }  
      
        /* Receive the PONG command. */  
        //收到回复了  
        if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {  
            char buf[1024];  
      
            /* Delete the readable event, we no longer need it now that there is
             * the PING reply to read. */  
            aeDeleteFileEvent(server.el,fd,AE_READABLE);  
      
            /* Read the reply with explicit timeout. */  
            buf[0] = '\0';  
            if (syncReadLine(fd,buf,sizeof(buf),  
                server.repl_syncio_timeout*1000) == -1)  
            {  
                redisLog(REDIS_WARNING,  
                    "I/O error reading PING reply from master: %s",  
                    strerror(errno));  
                goto error;  
            }  
      
            /* We accept only two replies as valid, a positive +PONG reply
             * (we just check for "+") or an authentication error.
             * Note that older versions of Redis replied with "operation not
             * permitted" instead of using a proper error code, so we test
             * both. */  
            if (buf[0] != '+' &&  
                strncmp(buf,"-NOAUTH",7) != 0 &&  
                strncmp(buf,"-ERR operation not permitted",28) != 0)  
            {  
                redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);  
                goto error;  
            } else {  
                redisLog(REDIS_NOTICE,  
                    "Master replied to PING, replication can continue...");  
            }  
        }  
      
        /* AUTH with the master if required. */  
        //auth身份验证  
        if(server.masterauth) {  
            err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);  
            if (err[0] == '-') {  
                redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);  
                sdsfree(err);  
                goto error;  
            }  
            sdsfree(err);  
        }  
      
        /* Set the slave port, so that Master's INFO command can list the
         * slave listening port correctly. */  
        /* 设置从客户端监听端口 */  
        {  
            sds port = sdsfromlonglong(server.port);  
            err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,  
                                             NULL);  
            sdsfree(port);  
            /* Ignore the error if any, not all the Redis versions support
             * REPLCONF listening-port. */  
            if (err[0] == '-') {  
                redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);  
            }  
            sdsfree(err);  
        }  
      
        /* Try a partial resynchonization. If we don't have a cached master
         * slaveTryPartialResynchronization() will at least try to use PSYNC
         * to start a full resynchronization so that we get the master run id
         * and the global offset, to try a partial resync at the next
         * reconnection attempt. */  
        psync_result = slaveTryPartialResynchronization(fd);  
        if (psync_result == PSYNC_CONTINUE) {  
            redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");  
            return;  
        }  
      
        /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
         * and the server.repl_master_runid and repl_master_initial_offset are
         * already populated. */  
        if (psync_result == PSYNC_NOT_SUPPORTED) {  
            redisLog(REDIS_NOTICE,"Retrying with SYNC...");  
            if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {  
                redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",  
                    strerror(errno));  
                goto error;  
            }  
        }  
      
        /* Prepare a suitable temp file for bulk transfer */  
        while(maxtries--) {  
            snprintf(tmpfile,256,  
                "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());  
            dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);  
            if (dfd != -1) break;  
            sleep(1);  
        }  
        if (dfd == -1) {  
            redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));  
            goto error;  
        }  
      
        /* Setup the non blocking download of the bulk file. */  
        if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)  
                == AE_ERR)  
        {  
            redisLog(REDIS_WARNING,  
                "Can't create readable event for SYNC: %s (fd=%d)",  
                strerror(errno),fd);  
            goto error;  
        }  
      
        server.repl_state = REDIS_REPL_TRANSFER;  
        server.repl_transfer_size = -1;  
        server.repl_transfer_read = 0;  
        server.repl_transfer_last_fsync_off = 0;  
        server.repl_transfer_fd = dfd;  
        server.repl_transfer_lastio = server.unixtime;  
        server.repl_transfer_tmpfile = zstrdup(tmpfile);  
        return;  
      
    error:  
        close(fd);  
        server.repl_transfer_s = -1;  
        server.repl_state = REDIS_REPL_CONNECT;  
        return;  
    }  

         在replication中,要一个cacheMaster的概念,就是可以临时缓存主客户端的信息,一般用于突然master和slave断开连接的时候,可以下次进行主从同步的时候快速恢复:



    /* 缓存客户端信息 */  
    void replicationCacheMaster(redisClient *c) {  
        listNode *ln;  
      
        redisAssert(server.master != NULL && server.cached_master == NULL);  
        redisLog(REDIS_NOTICE,"Caching the disconnected master state.");  
      
        /* Remove from the list of clients, we don't want this client to be
         * listed by CLIENT LIST or processed in any way by batch operations. */  
        //首先移除此客户端  
        ln = listSearchKey(server.clients,c);  
        redisAssert(ln != NULL);  
        listDelNode(server.clients,ln);  
      
        /* Save the master. Server.master will be set to null later by
         * replicationHandleMasterDisconnection(). */  
        //保存为缓存客户端  
        server.cached_master = server.master;  
      
        /* Remove the event handlers and close the socket. We'll later reuse
         * the socket of the new connection with the master during PSYNC. */  
        //删除在这个客户端上的读写事件  
        aeDeleteFileEvent(server.el,c->fd,AE_READABLE);  
        aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);  
        close(c->fd);  
      
        /* Set fd to -1 so that we can safely call freeClient(c) later. */  
        c->fd = -1;  
      
        /* Invalidate the Peer ID cache. */  
        if (c->peerid) {  
            sdsfree(c->peerid);  
            c->peerid = NULL;  
        }  
      
        /* Caching the master happens instead of the actual freeClient() call,
         * so make sure to adjust the replication state. This function will
         * also set server.master to NULL. */  
        replicationHandleMasterDisconnection();  
    }  

当想让这个master的复活的时候,调用下面的方法:



    /* Turn the cached master into the current master, using the file descriptor
     * passed as argument as the socket for the new master.
     *
     * This funciton is called when successfully setup a partial resynchronization
     * so the stream of data that we'll receive will start from were this
     * master left. */  
    /* 将缓存客户端复活 */  
    void replicationResurrectCachedMaster(int newfd) {  
        //将cached_master赋值为主客户端  
        server.master = server.cached_master;  
        server.cached_master = NULL;  
        server.master->fd = newfd;  
        server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);  
        server.master->authenticated = 1;  
        server.master->lastinteraction = server.unixtime;  
        server.repl_state = REDIS_REPL_CONNECTED;  
      
        /* Re-add to the list of clients. */  
        //重新添加入客户端列表中  
        listAddNodeTail(server.clients,server.master);  
        if (aeCreateFileEvent(server.el, newfd, AE_READABLE,  
                              readQueryFromClient, server.master)) {  
            redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));  
            freeClientAsync(server.master); /* Close ASAP. */  
        }  
      
        /* We may also need to install the write handler as well if there is
         * pending data in the write buffers. */  
        if (server.master->bufpos || listLength(server.master->reply)) {  
            if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,  
                              sendReplyToClient, server.master)) {  
                redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));  
                freeClientAsync(server.master); /* Close ASAP. */  
            }  
        }  
    }  

当然如果确定在未来不糊在使用缓存的master的时,可以彻底摧毁:



    /* Free a cached master, called when there are no longer the conditions for
     * a partial resync on reconnection. */  
    /* 当某个客户端将不会再回复的时候,可以释放掉缓存的主客户端 */  
    void replicationDiscardCachedMaster(void) {  
        if (server.cached_master == NULL) return;  
      
        redisLog(REDIS_NOTICE,"Discarding previously cached master state.");  
        server.cached_master->flags &= ~REDIS_MASTER;  
        //直接释放客户端  
        freeClient(server.cached_master);  
        //server的缓存客户端赋值为NULL  
        server.cached_master = NULL;  
    }  

在这里面靠的就是server.cached_master属性。slave在和master连接的时候,要进行master的ip地址和Port端口的确认:



    /* Set replication to the specified master address and port. */  
    /* 设定主客户端的ip地址和端口号 */  
    void replicationSetMaster(char *ip, int port) {  
        sdsfree(server.masterhost);  
        server.masterhost = sdsdup(ip);  
        server.masterport = port;  
        //设置完毕之后,断开所有的连接,中止replication进程  
        if (server.master) freeClient(server.master);  
        disconnectSlaves(); /* Force our slaves to resync with us as well. */  
        replicationDiscardCachedMaster(); /* Don't try a PSYNC. */  
        freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */  
        cancelReplicationHandshake();  
        server.repl_state = REDIS_REPL_CONNECT;  
        server.master_repl_offset = 0;  
    }  

主从复制的实现其实还有很多细节和步骤的。稍稍分析了一下,以后有机会研究的更深入一点




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-27098-1-1.html 上篇帖子: Redis源码分析(十八)--- db.c内存数据库操作 下篇帖子: Redis源码分析(二十)--- ae事件驱动
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表