了解redis中命令的执行过程有助于我们更加清楚很多redis的子模块是什么时候执行的.
简单来说命令的执行过程是这样的,首先它在网络框架中的读处理器中被调用,即readQueryFromClient
,我们在读处理器得到了此次从客户端收到的信息,如果可以解析成正确的命令和参数且此时缓冲区内无空余,认为此次解析成功,然后根据key在命令字典中查找对应的回调函数,找到后执行命令,接着返回其应有的返回值,中间还需要做很多的检查,至此,一次命令的执行过程就结束了.
processInputBuffer
出现在readQueryFromClient
读处理器的最后,它所做的事情就是对此次read的所有内容进行解析.
// 处理客户端输入的命令内容
void processInputBuffer(redisClient *c) {
/* Keep processing while there is something in the input buffer */
// 尽可能地处理查询缓冲区中的内容
// 如果读取出现 short read ,那么可能会有内容滞留在读取缓冲区里面
// 这些滞留内容也许不能完整构成一个符合协议的命令,
// 需要等待下次读事件的就绪
// query为读入缓冲区
while(sdslen(c->querybuf)) {
/* Return if clients are paused. */
// 如果客户端正处于暂停状态,那么直接返回
if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return;
/* Immediately abort if the client is in the middle of something. */
// REDIS_BLOCKED 状态表示客户端正在被阻塞
if (c->flags & REDIS_BLOCKED) return;
/* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands). */
// 客户端已经设置了关闭 FLAG ,没有必要处理命令了
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
/* Determine request type when unknown. */
// 多条查询是一般客户端发送来的,
// 而内联查询则是 TELNET 发送来的
if (!c->reqtype) {
if (c->querybuf[0] == '*') {
// 多条查询
c->reqtype = REDIS_REQ_MULTIBULK;
} else {
// 内联查询
c->reqtype = REDIS_REQ_INLINE;
}
}
// 将缓冲区中的内容转换成命令,以及命令参数
// 我们可以看到当解析错误的时候会直接break出循环 进而退出函数
// 而在函数内部
if (c->reqtype == REDIS_REQ_INLINE) {
if (processInlineBuffer(c) != REDIS_OK) break;
} else if (c->reqtype == REDIS_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != REDIS_OK) break;
} else {
redisPanic("Unknown request type");
}
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c); //重置客户端 即初始化其中的参数 并释放上一次解析中的argv
} else {
/* Only reset the client when the command was executed. */
// 执行命令的话就重置客户端
if (processCommand(c) == REDIS_OK)
resetClient(c);
}
}
}
从这里我们可以看到有两个函数是至关重要的,即processMultibulkBuffer
和processCommand
,一个用于解析命令,一个用于执行命令,我们一个一个看.先来看看processMultibulkBuffer吧.其作用为把判断输入缓冲区中的命令是否符合协议,符合的话放入argv并返回OK,否则返回error.
//一般协议是这样的 *3\r\n$3\r\nSET\r\n$3\r\nMSG\r\n$5\r\nHELLO\r\n
// * 代表此次命令为REDIS_REQ_MULTIBULK
// \r\n 间隔符
// $n 后面字符串的长度
int processMultibulkBuffer(redisClient *c) {
char *newline = NULL;
int pos = 0, ok;
long long ll;
// 读入命令的参数个数
// 比如 *3\r\n$3\r\nSET\r\n... 将令 c->multibulklen = 3
if (c->multibulklen == 0) {
/* The client should have been reset */
redisAssertWithInfo(c,NULL,c->argc == 0);
/* Multi bulk length cannot be read without a \r\n */
// 检查缓冲区的内容第一个 "\r\n"
newline = strchr(c->querybuf,'\r');
if (newline == NULL) {
if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big mbulk count string");
setProtocolError(c,0); //解析错误 设置flag 在servercron中关闭客户端
}
return REDIS_ERR;
}
/* Buffer should also contain \n */
if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
return REDIS_ERR;
/* We know for sure there is a whole line since newline != NULL,
* so go ahead and find out the multi bulk length. */
// 协议的第一个字符必须是 '*'
redisAssertWithInfo(c,NULL,c->querybuf[0] == '*');
// 将参数个数,也即是 * 之后, \r\n 之前的数字取出并保存到 ll 中
// 比如对于 *3\r\n ,那么 ll 将等于 3
ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
// 参数的数量超出限制
if (!ok || ll > 1024*1024) {
addReplyError(c,"Protocol error: invalid multibulk length");
setProtocolError(c,pos); //
return REDIS_ERR;
}
// 参数数量之后的位置
// 比如对于 *3\r\n$3\r\n$SET\r\n... 来说,
// pos 指向 *3\r\n$3\r\n$SET\r\n...
// ^
// |
// pos
pos = (newline-c->querybuf)+2;
// 如果 ll <= 0 ,那么这个命令是一个空白命令
// 那么将这段内容从查询缓冲区中删除,只保留未阅读的那部分内容
// 为什么参数可以是空的呢?
// processInputBuffer 中有注释到 "Multibulk processing could see a <= 0 length"
// 但并没有详细说明原因
if (ll <= 0) {
sdsrange(c->querybuf,pos,-1);
return REDIS_OK;
}
// 设置参数数量
c->multibulklen = ll;
/* Setup argv array on client structure */
// 根据参数数量,为各个参数对象分配空间
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
}
redisAssertWithInfo(c,NULL,c->multibulklen > 0);
// 从 c->querybuf 中读入参数,并创建各个参数对象到 c->argv
while(c->multibulklen) {
/* Read bulk length if unknown */
// 读入参数长度
if (c->bulklen == -1) {
// 确保 "\r\n" 存在
newline = strchr(c->querybuf+pos,'\r');
if (newline == NULL) {
if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
addReplyError(c,
"Protocol error: too big bulk count string");
setProtocolError(c,0);
return REDIS_ERR;
}
break;
}
/* Buffer should also contain \n */
if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
break;
// 确保协议符合参数格式,检查其中的 $...
// 比如 $3\r\nSET\r\n
if (c->querybuf[pos] != '$') {
addReplyErrorFormat(c,
"Protocol error: expected '$', got '%c'",
c->querybuf[pos]);
setProtocolError(c,pos);
return REDIS_ERR;
}
// 读取长度
// 比如 $3\r\nSET\r\n 将会让 ll 的值设置 3 $后面数字的意思是后面字符串的长度
ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
if (!ok || ll < 0 || ll > 512*1024*1024) {
//一个参数的长度不能超过512MB
addReplyError(c,"Protocol error: invalid bulk length");
setProtocolError(c,pos);
return REDIS_ERR;
}
// 定位到参数的开头
// 比如
// $3\r\nSET\r\n...
// ^
// |
// pos
pos += newline-(c->querybuf+pos)+2; //移动两位/r/n
// 如果参数非常长,那么做一些预备措施来优化接下来的参数复制操作
if (ll >= REDIS_MBULK_BIG_ARG)/*32MB*/ {
size_t qblen;
/* If we are going to read a large object from network
* try to make it likely that it will start at c->querybuf
* boundary so that we can optimize object creation
* avoiding a large copy of data. */
sdsrange(c->querybuf,pos,-1);
pos = 0;
qblen = sdslen(c->querybuf);
/* Hint the sds library about the amount of bytes this string is
* going to contain. */
if (qblen < ll+2)
c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-qblen);
}
// 参数的长度
c->bulklen = ll;
}
/* Read bulk argument */
// 读入参数 缓冲区剩下的数据不能小于参数中的值
if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) {
// 确保内容符合协议格式
// 比如 $3\r\nSET\r\n 就检查 SET 之后的 \r\n
/* Not enough data (+2 == trailing \r\n) */
break;
} else {
//符合协议要求 创建一个argv的项
// 为参数创建字符串对象
/* Optimization: if the buffer contains JUST our bulk element
* instead of creating a new object by *copying* the sds we
* just use the current sds string. */
// 一个小优化 如果当前缓冲区仅包含我们需要使用的元素,那就不把它复制出来,而是使用当前的sds字符串
if (pos == 0 &&
c->bulklen >= REDIS_MBULK_BIG_ARG &&
(signed) sdslen(c->querybuf) == c->bulklen+2)
{
c->argv[c->argc++] = createObject(REDIS_STRING,c->querybuf);
sdsIncrLen(c->querybuf,-2); /* remove CRLF */
c->querybuf = sdsempty();
/* Assume that if we saw a fat argument we'll see another one
* likely... */
c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen+2); //扩容到此次命令长度.
pos = 0;
} else {
//不优化 pos向后移动(命令长度+CRLF)
c->argv[c->argc++] =
createStringObject(c->querybuf+pos,c->bulklen);
pos += c->bulklen+2;
}
// 清空参数长度
c->bulklen = -1;
// 减少还需读入的参数个数
c->multibulklen--;
}
}
/* Trim to pos */
// 从 querybuf 中删除已被读取的内容
if (pos) sdsrange(c->querybuf,pos,-1);
/* We're done when c->multibulk == 0 */
// 如果本条命令的所有参数都已读取完,那么返回
if (c->multibulklen == 0) return REDIS_OK;
/* Still not read to process the command */
// 如果还有参数未读取完,那么就协议内容有错
return REDIS_ERR;
}
我们可以看到当返回REDIS_ERR的时候一般都已经设置了setProtocolError
,会异步的断开客户端.还有一些情况则会记录已经读取的记录,原因是有可能一次没有接受完毕所有数据,先解析一部分,剩下的等下次再接收.返回OK的话就证明在argv中已经设置好了命令,argv[0]中存着key 后面则是参数.我们再来看看执行函数processCommand
int processCommand(redisClient *c) {
/* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in
* a regular command proc. */
// 特别处理 quit 命令
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= REDIS_CLOSE_AFTER_REPLY;
return REDIS_ERR;
}
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
// 根据argv[0]在字典中查找当前命令,并进行命令合法性检查,以及命令参数个数检查 字典名为commands
// c->cmd为当前要执行的命令 同时更新lastcmd loopupcommand其实就是在字典重根据键查找值而已
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
// 没找到指定的命令
flagTransaction(c);
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
return REDIS_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
//检查参数个数是否正确 错误的话进入
flagTransaction(c); //修改flag
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return REDIS_OK;
}
/* Check if the user is authenticated */
// 检查认证信息 默认未开启 在redis.conf中修改requirepass打开
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
flagTransaction(c);
addReply(c,shared.noautherr);
return REDIS_OK;
}
//集群方面的检查 现在知识水平还没到
.................
/* Handle the maxmemory directive.
*
* First we try to free some memory if possible (if there are volatile
* keys in the dataset). If there are not the only thing we can do
* is returning an error. */
// 如果设置了最大内存,那么检查内存是否超过限制,并做相应的操作
// 在源码解析(11) 中详尽介绍了这个函数 包括六种处理策略
if (server.maxmemory) {
// 如果内存已超过限制,那么尝试通过删除过期键来释放内存
int retval = freeMemoryIfNeeded();
// 如果即将要执行的命令可能占用大量内存(REDIS_CMD_DENYOOM)
// 并且前面的内存释放失败的话
// 那么向客户端返回内存错误
if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
flagTransaction(c);
addReply(c, shared.oomerr);
return REDIS_OK;
}
}
//主从复制相关 分布式学完回来补上
..............
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
// 在订阅于发布模式的上下文中,只能执行订阅和退订相关的命令
if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
&&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
return REDIS_OK;
}
............................
//Lua脚本相关
......
/* Exec the command */
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// 在事务上下文中
// 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外
// 其他所有命令都会被入队到事务队列中
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
// 执行命令 其中也会写入AOF和慢查询日志,以及更新此次命令持续时间和一些客户端信息.
call(c,REDIS_CALL_FULL);
c->woff = server.master_repl_offset;
// 处理那些解除了阻塞的键
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return REDIS_OK;
}
在执行命令的时候,返回内容就会被写入到定长缓冲区或者边长缓冲链表中,在写处理器重全部返回给客户端.这样,一次命令的调用就结束了.