20、Redis 源码解析 - Redis 增量同步

接着Redis 源码解析 - Redis 主从同步一,此时
replica处于REPL_STATE_SEND_PSYNC这个状态开始,真正的开始向master拉取数据。
主从同步时,replica向master拉数据有两种方式:全量同步、增量同步。全量同步需要replica向master拉取全量的数据,而增量同步只需要拉取增量数据,增量数据量远小于全量数据量,所以增量同步效率更高,也更节省带宽。

首先replica优先尝试向master进行增量同步,即:如果这个replica之前已经挂在这master之下,但是由于某些原因失联了,主从同步断了之后,此时这个replica的server.repl_state必然是REPL_STATE_CONNECT状态,现在replica想重新连接上master,就会不停的调用syncWithMaster尝试,一旦走到了REPL_STATE_SEND_PSYNC状态,由于replica中之前已经有了部分master的数据,有可能不需要拉取master的全量数据,只需要拉取在失联的这段时间master接收到的增量请求即可。所以增量同步适用于replica与master短暂失联的场景,这也是线上最有可能发生的场景。
那么replica以及master如何判断能否进行增量同步呢?很明显要满足两点:replica之前挂在这个master之下;失联这段时间内,master产生的数据量不能太多。

增量同步条件

1、 如何判断这个master之前是否连接过呢?;

struct redisServer {
   
     
	...
    char replid[CONFIG_RUN_ID_SIZE+1];  /* My current replication ID. */
    char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/
    ...
    client *cached_master; /* Cached master to be reused for PSYNC. */
	...
};

replid是当前节点的复制id;如果节点是replica,之后通过replicaof no one变成master,replid2表示之前master的复制id,在Redis启动流程中,会调用initServerConfig来初始化节点,其中就会设置这两个字段:

void initServerConfig(void) {
   
     
	...
	changeReplicationId();
    clearReplicationId2();
	...
}

void changeReplicationId(void) {
   
     
    getRandomHexChars(server.replid,CONFIG_RUN_ID_SIZE);
    server.replid[CONFIG_RUN_ID_SIZE] = '\0';
}

void clearReplicationId2(void) {
   
     
    memset(server.replid2,'0',sizeof(server.replid));
    server.replid2[CONFIG_RUN_ID_SIZE] = '\0';
    server.second_replid_offset = -1;
}

同时master在有了第一个replica,或者去掉所有replica时,也会调用changeReplicationId更改server.replid,为了避免重启时丢失server.replid,server.replid还会被持久化到RDB文件中,在重启时从磁盘加载数据后,会根据RDB文件重置server.replid。
cached_master表示该节点是replica,在与master失联之前,会缓存之前的master:

void freeClient(client *c) {
   
     
    if (server.master && c->flags & CLIENT_MASTER) {
   
     
        serverLog(LL_WARNING,"Connection with master lost.");
        if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
                          CLIENT_CLOSE_ASAP|
                          CLIENT_BLOCKED)))
        {
   
     
            replicationCacheMaster(c);
            return;
        }
    }
}

在replica失联之后,再次与master连接上,会根据cached_master中的信息发送replid给master,master对比自己的replid,如果相同,那么说明该replica之前是挂在自己下面,可以尝试增量同步。

2、 如何确定在replica失联这段时间,master没有接受过多的数据?;
在master在有了第一个replica,或者去掉所有replica时,会打开或者关闭repl_backlog,这是个char*指针,默认为256M,保存的是master最近时间内的增量数据。

struct redisServer {
   
     
	...
    char *repl_backlog;             /* Replication backlog for partial syncs */
    long long repl_backlog_size;    /* Backlog circular buffer size */
    long long repl_backlog_histlen; /* Backlog actual data length */
    long long repl_backlog_idx;     /* Backlog circular buffer current offset,
                                       that is the next byte will'll write to.*/
	...
};

在replica失联之后,再次与master连接上,会根据cached_master中上一次同步成功的offset发送给master,master判断offset是否落在repl_backlog内,如果是,则尝试增量同步。

增量同步状态机

REPL_STATE_SEND_PSYNC

当replica处于REPL_STATE_SEND_PSYNC时,开始拉取master数据,首先尝试增量同步,然后进入REPL_STATE_RECEIVE_PSYNC状态:

void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
   
     
	...
	/* Try a partial resynchonization. If we don't have a cached master
     * slaveTryPartialResynchronization() will at least try to use PSYNC
     * to start a full resynchronization so that we get the master run id
     * and the global offset, to try a partial resync at the next
     * reconnection attempt. */
    if (server.repl_state == REPL_STATE_SEND_PSYNC) {
   
     
        if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
   
     
            err = sdsnew("Write error sending the PSYNC command.");
            goto write_error;
        }
        server.repl_state = REPL_STATE_RECEIVE_PSYNC;
        return;
    }
    ...
}

int slaveTryPartialResynchronization(int fd, int read_reply) {
   
     
	...
	/* Writing half */
    if (!read_reply) {
   
     
        /* Initially set master_initial_offset to -1 to mark the current
         * master run_id and offset as not valid. Later if we'll be able to do
         * a FULL resync using the PSYNC command we'll set the offset at the
         * right value, so that this information will be propagated to the
         * client structure representing the master into server.master. */
        server.master_initial_offset = -1;

        if (server.cached_master) {
   
     
            psync_replid = server.cached_master->replid;
            snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
            serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
        } else {
   
     
            serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
            psync_replid = "?";
            memcpy(psync_offset,"-1",3);
        }

        /* Issue the PSYNC command */
        reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL);
        if (reply != NULL) {
   
     
            serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
            sdsfree(reply);
            aeDeleteFileEvent(server.el,fd,AE_READABLE);
            return PSYNC_WRITE_ERROR;
        }
        return PSYNC_WAIT_REPLY;
    }
    ...
}

replica尝试用cached_master的replid和reploff发送PSYNC命令,如果没有cached_master,则发送PSYNC ? -1,表示要全量同步。

master接受到PSYNC命令之后,根据上面介绍的增量同步条件判断是否可以进行增量同步,如果可以进行增量同步,则回复+CONTINUE replid backlog

/* SYNC and PSYNC command implemenation. */
void syncCommand(client *c) {
   
     
	...
	/* Try a partial resynchronization if this is a PSYNC command.
     * If it fails, we continue with usual full resynchronization, however
     * when this happens masterTryPartialResynchronization() already
     * replied with:
     *
     * +FULLRESYNC <replid> <offset>
     *
     * So the slave knows the new replid and offset to try a PSYNC later
     * if the connection with the master is lost. */
    if (!strcasecmp(c->argv[0]->ptr,"psync")) {
   
     
        if (masterTryPartialResynchronization(c) == C_OK) {
   
     
            server.stat_sync_partial_ok++;
            return; /* No full resync needed, return. */
        } else {
   
     
            char *master_replid = c->argv[1]->ptr;

            /* Increment stats for failed PSYNCs, but only if the
             * replid is not "?", as this is used by slaves to force a full
             * resync on purpose when they are not albe to partially
             * resync. */
            if (master_replid[0] != '?') server.stat_sync_partial_err++;
        }
    }
    ...
}

int masterTryPartialResynchronization(client *c) {
   
     
	...
	// 判断命令中的replid是否对的上
	if (strcasecmp(master_replid, server.replid) &&
        (strcasecmp(master_replid, server.replid2) ||
         psync_offset > server.second_replid_offset))
    {
   
     
        /* Run id "?" is used by slaves that want to force a full resync. */
        if (master_replid[0] != '?') {
   
     
            if (strcasecmp(master_replid, server.replid) &&
                strcasecmp(master_replid, server.replid2))
            {
   
     
                serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
                    "Replication ID mismatch (Replica asked for '%s', my "
                    "replication IDs are '%s' and '%s')",
                    master_replid, server.replid, server.replid2);
            } else {
   
     
                serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
                    "Requested offset for second ID was %lld, but I can reply "
                    "up to %lld", psync_offset, server.second_replid_offset);
            }
        } else {
   
     
            serverLog(LL_NOTICE,"Full resync requested by replica %s",
                replicationGetSlaveName(c));
        }
        goto need_full_resync;
    }
    // 判断replica的offset是否落在repl_backlog中
    if (!server.repl_backlog ||
        psync_offset < server.repl_backlog_off ||
        psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
    {
   
     
        serverLog(LL_NOTICE,
            "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
        if (psync_offset > server.master_repl_offset) {
   
     
            serverLog(LL_WARNING,
                "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
        }
        goto need_full_resync;
    }
     // 接下来进入增量同步
     /* If we reached this point, we are able to perform a partial resync:
     * 1) Set client state to make it a slave.
     * 2) Inform the client we can continue with +CONTINUE
     * 3) Send the backlog data (from the offset to the end) to the slave. */
     // 把该replica加入server.slaves管理
    c->flags |= CLIENT_SLAVE;
    c->replstate = SLAVE_STATE_ONLINE;
    c->repl_ack_time = server.unixtime;
    c->repl_put_online_on_ack = 0;
    listAddNodeTail(server.slaves,c);
    /* We can't use the connection buffers since they are used to accumulate
     * new commands at this stage. But we are sure the socket send buffer is
     * empty so this write will never fail actually. */
    // 回复+CONTINUE replid backlog
    if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
   
     
        buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
    } else {
   
     
        buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
    }
    if (write(c->fd,buf,buflen) != buflen) {
   
     
        freeClientAsync(c);
        return C_OK;
    }
    psync_len = addReplyReplicationBacklog(c,psync_offset);
    serverLog(LL_NOTICE,
        "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
            replicationGetSlaveName(c),
            psync_len, psync_offset);
    /* Note that we don't need to set the selected DB at server.slaveseldb
     * to -1 to force the master to emit SELECT, since the slave already
     * has this state from the previous connection with the master. */

    refreshGoodSlavesCount();
    return C_OK; /* The caller can return, no full resync needed. */

need_full_resync:
    /* We need a full resync for some reason... Note that we can't
     * reply to PSYNC right now if a full SYNC is needed. The reply
     * must include the master offset at the time the RDB file we transfer
     * is generated, so we need to delay the reply to that moment. */
    return C_ERR;
}

REPL_STATE_RECEIVE_PSYNC

replica处于REPL_STATE_RECEIVE_PSYNC,收到master的回复,触发syncWithMaster函数:

void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
   
     
	...
	psync_result = slaveTryPartialResynchronization(fd,1);
    if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */

    /* If the master is in an transient error, we should try to PSYNC
     * from scratch later, so go to the error path. This happens when
     * the server is loading the dataset or is not connected with its
     * master and so forth. */
    if (psync_result == PSYNC_TRY_LATER) goto error;

    /* Note: if PSYNC does not return WAIT_REPLY, it will take care of
     * uninstalling the read handler from the file descriptor. */

    if (psync_result == PSYNC_CONTINUE) {
   
     
        serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
        return;
    }
    ...
}

int slaveTryPartialResynchronization(int fd, int read_reply) {
   
     
	...
	 /* Reading half */
    reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
    ...
    if (!strncmp(reply,"+FULLRESYNC",11)) {
   
     
    	// master要求全量同步
    	...
    }
	if (!strncmp(reply,"+CONTINUE",9)) {
   
     
		// master可以增量同步
        /* Partial resync was accepted. */
        serverLog(LL_NOTICE,
            "Successful partial resynchronization with master.");

        /* Check the new replication ID advertised by the master. If it
         * changed, we need to set the new ID as primary ID, and set or
         * secondary ID as the old master ID up to the current offset, so
         * that our sub-slaves will be able to PSYNC with us after a
         * disconnection. */
        char *start = reply+10;
        char *end = reply+9;
        while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
        if (end-start == CONFIG_RUN_ID_SIZE) {
   
     
            char new[CONFIG_RUN_ID_SIZE+1];
            memcpy(new,start,CONFIG_RUN_ID_SIZE);
            new[CONFIG_RUN_ID_SIZE] = '\0';
			// master的replid发生改变
            if (strcmp(new,server.cached_master->replid)) {
   
     
                /* Master ID changed. */
                serverLog(LL_WARNING,"Master replication ID changed to %s",new);

                /* Set the old ID as our ID2, up to the current offset+1. */
                memcpy(server.replid2,server.cached_master->replid,
                    sizeof(server.replid2));
                server.second_replid_offset = server.master_repl_offset+1;

                /* Update the cached master ID and our own primary ID to the
                 * new one. */
                memcpy(server.replid,new,sizeof(server.replid));
                memcpy(server.cached_master->replid,new,sizeof(server.replid));

                /* Disconnect all the sub-slaves: they need to be notified. */
                disconnectSlaves();
            }
        }

        /* Setup the replication to continue. */
        sdsfree(reply);
        replicationResurrectCachedMaster(fd);

        /* If this instance was restarted and we read the metadata to
         * PSYNC from the persistence file, our replication backlog could
         * be still not initialized. Create it. */
        if (server.repl_backlog == NULL) createReplicationBacklog();
        return PSYNC_CONTINUE;
    }

	...
}

可以看到,如果replica增量同步成功,就从syncWithMaster函数中返回了,接下来就调用replicationResurrectCachedMaster注册可读事件readQueryFromClient,这个函数在Redis 源码解析 - Redis 命令端到端的过程中介绍过,用来读取并处理客户端的命令,只不过此时的客户端是个master:

void replicationResurrectCachedMaster(int newfd) {
   
     
	// 更新master信息
    server.master = server.cached_master;
    server.cached_master = NULL;
    server.master->fd = newfd;
    server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
    server.master->authenticated = 1;
    server.master->lastinteraction = server.unixtime;
    server.repl_state = REPL_STATE_CONNECTED;
    server.repl_down_since = 0;

    /* Re-add to the list of clients. */
    linkClient(server.master);
    // 注册可读事件readQueryFromClient
    if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
                          readQueryFromClient, server.master)) {
   
     
        serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
        freeClientAsync(server.master); /* Close ASAP. */
    }

    /* We may also need to install the write handler as well if there is
     * pending data in the write buffers. */
    if (clientHasPendingReplies(server.master)) {
   
     
        if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
                          sendReplyToClient, server.master)) {
   
     
            serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
            freeClientAsync(server.master); /* Close ASAP. */
        }
    }
}

这样master跟随+CONTINUE发送过来的增量命令就可以被replica读取并应用到DB中。