Redis的集群模式是去中心化架构,采用gossip协议实现集群中节点间信息同步。
gossip过程是由种子节点发起,当一个种子节点有状态需要更新到网络中的其他节点时,它会随机的选择周围几个节点散播消息,收到消息的节点也会重复该过程,直至最终网络中所有的节点都收到了消息。这个过程可能需要一定的时间,由于不能保证某个时刻所有的节点都收到消息,但是能够保证最终所有节点都会收到消息,因此它是一个最终一致性协议。
本节介绍下Redis中如何通过gossip协议达到节点间信息的最终一致性
消息
Redis集群中通过消息来传递各个节点间的信息,消息由消息头+消息体组成:
typedef struct {
char sig[4]; /* Signature "RCmb" (Redis Cluster message bus). */
uint32_t totlen; /* Total length of this message */
uint16_t ver; /* Protocol version, currently set to 1. */
uint16_t port; /* TCP base port number. */
uint16_t type; /* Message type */
uint16_t count; /* Only used for some kind of messages. */
uint64_t currentEpoch; /* The epoch accordingly to the sending node. */
uint64_t configEpoch; /* The config epoch if it's a master, or the last
epoch advertised by its master if it is a
slave. */
uint64_t offset; /* Master replication offset if node is a master or
processed replication offset if node is a slave. */
char sender[CLUSTER_NAMELEN]; /* Name of the sender node */
unsigned char myslots[CLUSTER_SLOTS/8];
char slaveof[CLUSTER_NAMELEN];
char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */
char notused1[34]; /* 34 bytes reserved for future usage. */
uint16_t cport; /* Sender TCP cluster bus port */
uint16_t flags; /* Sender node flags */
unsigned char state; /* Cluster state from the POV of the sender */
unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */
union clusterMsgData data;
} clusterMsg;
消息头部分包含了port、epoch、分配的slot、master等信息,消息体为clusterMsgData,根据不同的消息类型,有不同的结构:
union clusterMsgData {
/* PING, MEET and PONG */
struct {
/* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[1];
} ping;
/* FAIL */
struct {
clusterMsgDataFail about;
} fail;
/* PUBLISH */
struct {
clusterMsgDataPublish msg;
} publish;
/* UPDATE */
struct {
clusterMsgDataUpdate nodecfg;
} update;
/* MODULE */
struct {
clusterMsgModule msg;
} module;
};
gossip协议使用PING、PONG类型的消息:
typedef struct {
char nodename[CLUSTER_NAMELEN];
uint32_t ping_sent;
uint32_t pong_received;
char ip[NET_IP_STR_LEN]; /* IP address last time it was seen */
uint16_t port; /* base port last time it was seen */
uint16_t cport; /* cluster port last time it was seen */
uint16_t flags; /* node->flags copy */
uint32_t notused1;
} clusterMsgDataGossip;
包含了以自身视角看,对应节点的发送PING消息时间、接受PONG消息时间、port已经对应的状态
gossip过程
集群中每个节点在一定时间内向其他节点发送PING消息,收到PING消息的节点回复PONG消息,每个节点通过PING/PONG消息感知其他节点状态,进行状态更新,最终使得集群中每个节点状态达到一致。
何时发送PIONG消息,PING消息都包含哪些节点信息呢,这部分在ClusterCron这个集群模式下的周期性函数实现:
1、 随机选择5个节点,如果该节点还没有发送PING消息,选择最迟收到PONG消息的节点,向该节点发送PING消息;
/* This is executed 10 times every second */
void clusterCron(void) {
...
if (!(iteration % 10)) {
int j;
/* Check a few random nodes and ping the one with the oldest
* pong_received time. */
// 随机选择5个节点
for (j = 0; j < 5; j++) {
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
/* Don't ping nodes disconnected or with a ping currently active. */
// 该节点还没有发送PING消息
if (this->link == NULL || this->ping_sent != 0) continue;
// 排除自己和CLUSTER_NODE_HANDSHAKE的节点
if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
// 选择最迟收到PONG消息的节点
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}
// 发送PING消息
if (min_pong_node) {
serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}
...
}
2、 遍历所有节点,如果有节点长时间内没有发送过PING消息,则发送:;
/* This is executed 10 times every second */
void clusterCron(void) {
...
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
/* If we have currently no active ping in this instance, and the
* received PONG is older than half the cluster timeout, send
* a new ping now, to ensure all the nodes are pinged without
* a too big delay. */
if (node->link &&
node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout/2)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
}
...
}
这里面设置的时间阈值为server.cluster_node_timeout/2,在集群模式下,如果节点A向节点B发送PING消息后,超过server.cluster_node_timeout时间内,节点A没有收到节点B的PONG消息,那么节点A就会把节点B设置为PFAIL状态,所以这里判断发送PING消息的时间为server.cluster_node_timeout/2,还要考虑一个回复PONG消息的网络来回时间。
当节点A决定向节点B发送PING消息时,节点A需要构造gossip消息体,会选择集群中的一部分节点,将这部分节点的信息一并发送给节点B,节点B根据这些信息,看情况更新自身的集群信息。
那么节点A需要选取多少个节点,以及选取哪些节点呢,这就需要考虑到消息传播的时效性以及集群规模的折中了。选取的节点过少,集群中各个节点的状态达到最终一致性的时间就边长,选取的节点过多,网络传输量就多大,同时集群中传播的冗余信息就变多,节点对于消息的处理的代价就变大,注意到Redis是单线程模型,集群模式下对于消息的处理是与用户命令处理是同一个线程,过大的消息体的处理必然会占用更多的CPU,这将严重限制集群的规模。按照当前的Redis gossip协议策略,一般可以支持400个左右的节点集群,节点数在增加,就会对用户的命令处理带来较明显的影响。
gossip消息体节点的选取策略在clusterSendPing中实现:
void clusterSendPing(clusterLink*link, int type) {
...
int freshnodes = dictSize(server.cluster->nodes)-2;
wanted = floor(dictSize(server.cluster->nodes)/10);
if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes;
...
int pfail_wanted = server.cluster->stats_pfail_nodes;
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
}
包含两部分:1/10的总节点数、PFAIL状态的节点数。PFAIL状态的节点需要尽快通知其他节点,以便进行slave的failover。那么集群正常状态下为什么要选取1/10的节点数呢,作者在clusterSendPing中的注释解释了这个问题:按照上面介绍的gossip协议发送时间,每隔server.cluster_node_timeout/2的时间内会向对端节点发送一个PING消息,对端节点会回复一个PONG消息,那么在server.cluster_node_timeout时间内,会处理4个gossip协议,由于PFAIL标志的有效时间是server.cluster_node_timeout2,在这段时间内,共会处理8个gossip协议,假设此时集群中有个节点A宕机,这个宕机节点被另外节点B检测到,节点B将其标志位PFAIL状态,在server.cluster_node_timeout2时间内,节点B可以将发送8次gossip协议,每次带有1/10的节点,那么就有80%的概率将节点A的宕机信息传播给集群中其他节点,同时将PFAIL状态标志位FAIL状态只需要过半的节点数即可,从概率上将,80%的概率足够集群中其他节点标志位PFAIL状态,从而不影响正常的failover。
选取好节点数之和,就是构造并发送gossip协议了,这部分代码较繁琐,直接看代码即可。