Redis支持发布订阅功能,即一个客户端可以向channel中publish发布一条消息,订阅这个channel的客户端就能收到这条消息,本节解析发布订阅功能的实现。
发布订阅的实现在pubsub.c中
相关命令如下:
Unsubscribe:指退订给定的频道。
Subscribe:订阅给定的一个或多个频道的信息。
Pubsub:查看订阅与发布系统状态。
Punsubscribe:退订所有给定模式的频道。
Publish:将信息发送到指定的频道。
Psubscribe:订阅一个或多个符合给定模式的频道。
发布订阅数据结构
在struct Server中与发布订阅有关的字段:
struct Server {
...
/* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */
list *pubsub_patterns; /* A list of pubsub_patterns */
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of NOTIFY_... flags. */
...
};
dict *pubsub_channels的key是robj指针,保存channel,value是个list,保存订阅该channel的所有client。
list *pubsub_patterns保存所有给定模式的channel,具体如下:
typedef struct pubsubPattern {
client *client;
robj *pattern;
} pubsubPattern;
同时struct client也有两个与发布订阅有关的字段:
struct client{
...
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
...
};
dict *pubsub_channels该客户端订阅的channel
list *pubsub_patterns该客户端订阅的所有模式channel
发布订阅实现
订阅一个channel:
void subscribeCommand(client *c) {
int j;
// 客户端订阅每个channel
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
// 打上CLIENT_PUBSUB标志
c->flags |= CLIENT_PUBSUB;
}
int pubsubSubscribeChannel(client *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
// 向客户端的pubsub_channels添加channel
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
/* Add the client to the channel -> list of clients hash table */
// 向server的pubsub_channels添加channel和该客户端
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
// 当前该channel不存在,则新建
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
}
/* Notify the client */
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}
订阅一组模式:
void psubscribeCommand(client *c) {
int j;
// 逐个添加模式
for (j = 1; j < c->argc; j++)
pubsubSubscribePattern(c,c->argv[j]);
// 打上CLIENT_PUBSUB标识
c->flags |= CLIENT_PUBSUB;
}
int pubsubSubscribePattern(client *c, robj *pattern) {
int retval = 0;
// 向client的pubsub_patterns添加模式
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
retval = 1;
pubsubPattern *pat;
listAddNodeTail(c->pubsub_patterns,pattern);
incrRefCount(pattern);
pat = zmalloc(sizeof(*pat));
pat->pattern = getDecodedObject(pattern);
pat->client = c;
// 向server的pubsub_patterns添加模式
listAddNodeTail(server.pubsub_patterns,pat);
}
/* Notify the client */
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.psubscribebulk);
addReplyBulk(c,pattern);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}
在订阅channel和模式的时候都要给该client打上CLIENT_PUBSUB标识,这个标识的作用在于:在Redis执行一个用户命令之前,前面会有一系列条件判断该命令是否可以执行,其中一条就是有关subpub命令:
int processCommand(client *c) {
...
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
return C_OK;
}
...
}
向channel中发布消息:
void publishCommand(client *c) {
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
// 集群模式下处理发布消息
if (server.cluster_enabled)
clusterPropagatePublish(c->argv[1],c->argv[2]);
else
forceCommandPropagation(c,PROPAGATE_REPL);
addReplyLongLong(c,receivers);
}
关于集群模式下发布消息的传播在后面文章中分析,本节先看pubsubPublishMessage的实现:
/* Publish a message */
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
listNode *ln;
listIter li;
/* Send to clients listening for that channel */
// 从server.pubsub_channels查找对应的channel
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;
// 遍历订阅该channel的每一个client,发送消息
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
addReplyBulk(c,message);
receivers++;
}
}
/* Send to clients listening to matching channels */
// 遍历server.pubsub_patterns
if (listLength(server.pubsub_patterns)) {
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;
// 如果发现该模式与该channel匹配
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
// 向订阅该模式的客户端发送消息
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);
receivers++;
}
}
decrRefCount(channel);
}
return receivers;
}
更改通知机制
在之前的几章中,经常能在源码上看到notifyKeyspaceEvent这个函数,比如:更新完KV对时;过期健删除时;内存淘汰时等等,这个函数借助于发布订阅机制,向订阅key变更的客户端发布一条消息,能够让这些客户端即使感知到他们所感兴趣的key的改变。
通知机制的实现在notify.c中
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
sds chan;
robj *chanobj, *eventobj;
int len = -1;
char buf[24];
/* If notifications for this class of events are off, return ASAP. */
if (!(server.notify_keyspace_events & type)) return;
eventobj = createStringObject(event,strlen(event));
/* __keyspace@<db>__:<key> <event> notifications. */
// 健空间通知
// 发布一条 __keyspace@<db>__:<key> <event> 消息
if (server.notify_keyspace_events & NOTIFY_KEYSPACE) {
chan = sdsnewlen("__keyspace@",11);
len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, key->ptr);
chanobj = createObject(OBJ_STRING, chan);
pubsubPublishMessage(chanobj, eventobj);
decrRefCount(chanobj);
}
/* __keyevent@<db>__:<event> <key> notifications. */
// db空间通知
// 发布一条 __keyevent@<db>__:<event> <key> 消息
if (server.notify_keyspace_events & NOTIFY_KEYEVENT) {
chan = sdsnewlen("__keyevent@",11);
if (len == -1) len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, eventobj->ptr);
chanobj = createObject(OBJ_STRING, chan);
pubsubPublishMessage(chanobj, key);
decrRefCount(chanobj);
}
decrRefCount(eventobj);
}