05、Redis 源码解析 - Redis 命令端到端的过程

Redis 源码解析 - Redis 启动流程中,一个Redis节点启动的最后一步是启动事件驱动框架,来等待可读事件发生。客户端发出的一条命令的端到端过程大体如下:

1、 客户端连接服务端,触发节点读事件,创建客户端Client,注册该客户端的读事件;
2、 客户端发送命令,触发节点的读事件,服务端开始读取客户端套接字,放入客户端缓存中;
3、 从客户端缓存中根据Redis协议解析命令;
4、 处理命令;
5、 触发写事件,给客户端发送响应;
6、 销毁客户端;

下面逐步解析这6个步骤。

客户端创建

在客户端发起对服务端的连接请求时,会在服务端的事件循环中触发读事件,在Redis 源码解析 - Redis 启动流程启动节点之前,会初始化Server,其中有一步是初始化网络框架,注册监听端口事件:

void initServer(void) {
   
     
	...
	// 网络框架初始化
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    ...
    // 监听accept事件
    for (j = 0; j < server.ipfd_count; j++) {
   
     
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
   
     
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }

Redis 源码解析 - Redis 事件驱动框架中,当该套接字上发生读事件时,会根据注册的事件处理函数acceptTcpHandler来处理

redis作为一个单进程单线程的KV存储服务,工作流程如下:在main函数中,前面做了一些初始化操作之后,在一个while循坏中不停的调用aeMain进行事件处理:

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
   
     
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    
    while(max--) {
   
     
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
   
     
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(cfd,0,cip);
    }
}

acceptTcpHandler主要做了两件事:

1、 anetTcpAccept:从监听端口创建连接,anetTcpAccept是对linuxaccept的简单封装,在networking.cc中;
2、 acceptCommonHandler:对创建好的连接套接字处理;

static void acceptCommonHandler(int fd, int flags, char *ip) {
   
     
	    client *c;
    if ((c = createClient(fd)) == NULL) {
   
     
        serverLog(LL_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
}

acceptCommonHandler就做了一件事,就是调用createClient创建客户端:

client *createClient(int fd) {
   
     
    client *c = zmalloc(sizeof(client));
    if (fd != -1) {
   
     
    	// 设置client的套接字非阻塞,noDelay属性
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
        // 设置client套接字keepalive属性
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        // 注册事件,设置事件处理函数readQueryFromClient
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
   
     
            close(fd);
            zfree(c);
            return NULL;
        }
    }
    //客户端属性初始化
    ...

在Redis中,客户端用struct client表示,createClient先注册客户端的套接字事件,设置套接字非阻塞属性,回调函数readQueryFromClient,然后就是大量的客户端属性初始化,至此,一个客户端被创建。

命令的读取

客户端连接上服务端以后,当客户端发送命令时,会触发服务端createClient时创建好的套接字读事件,调用readQueryFromClient从套接字中读取数据,存储到client的缓冲区
querybuf中,有两点需要注意:
1、 命令的类型分为PROTO_REQ_MULTIBULK和PROTO_REQ_INLINE,分别对应redis-cli和telnet发送过来的命令,两者发送命令的协议不同;
2、 在命令读取阶段为主从同步做了很多准备;
读取完命令之后,就调用processInputBufferAndReplicate函数对命令进一步处理

//读取客户端命令
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
   
     
    client *c = (client*) privdata;
    int nread, readlen;
    size_t qblen;
    UNUSED(el);
    UNUSED(mask);
    readlen = PROTO_IOBUF_LEN;

    //确定要读取的字节长度
    //下面这种情况主要防止设置的单个参数的长度不够(比如:set的字符串过长),发生第二次调用read
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
   
     
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);

        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }

    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    //准备输入缓冲区
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    //网络读取
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
   
     
    	// 读出错
        if (errno == EAGAIN) {
   
     
            return;
        } else {
   
     
            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    } else if (nread == 0) {
   
     
    	// 客户端关闭
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
   
     
        //对于master传来的命令,将命令追加到pending_querybuf中
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }

    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    server.stat_net_input_bytes += nread;
    // 读取的长度超过设置的最大缓存值
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
   
     
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }
	//命令处理
    processInputBufferAndReplicate(c);
}

命令的解析与处理

命令处理在processInputBufferAndReplicate中进行,processInputBufferAndReplicate对processInputBuffer进行封装,对主从同步的情况做了些处理,进一步向slave传播master的命令。processInputBuffer主要在以下情况被调用:
1、 从客户端套接字中读取到更多的字符;
2、 当一个阻塞客户端被激活时,例如multi命令;
processInputBuffer主要做了两件事:
1、 processMultibulkBuffer:对输入缓冲区中的内容进行解析,并将解析出的命令和参数填入客户端结构中;
2、 processCommand:处理命令;

void processInputBuffer(client *c) {
   
     
	server.current_client = c;
	//读取输入缓冲区中的内容
    while(c->qb_pos < sdslen(c->querybuf)) {
   
     
    	//所在客户端被暂停
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
        //客户端被阻塞
        if (c->flags & CLIENT_BLOCKED) break;
        //master发送命令,但是slave现在繁忙
        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
        //客户端被设置准备关闭标志
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
        //识别命令类型
        if (!c->reqtype) {
   
     
            if (c->querybuf[c->qb_pos] == '*') {
   
     
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
   
     
                c->reqtype = PROTO_REQ_INLINE;
            }
        }
        //识别出类型之后,对读取的字节进行解析出命令和参数,并封装到c-argv[]数组中
        if (c->reqtype == PROTO_REQ_INLINE) {
   
     
            if (processInlineBuffer(c) != C_OK) break;
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
   
     
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
   
     
            serverPanic("Unknown request type");
        }
        if (c->argc == 0) {
   
     
            resetClient(c);
        } else {
   
     
            //命令处理
            if (processCommand(c) == C_OK) {
   
     
                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
   
     
                    //更新实际的复制偏移量
                    c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
                }
                //对于阻塞中的客户端不要重制
                if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
                    resetClient(c);
            }
            //如果在处理命令的过程中,例如阻塞命令,客户端被释放,则不再继续处理
            if (server.current_client == NULL) break;
        }
    }
	//压缩输入缓冲区
    if (server.current_client != NULL && c->qb_pos) {
   
     
        sdsrange(c->querybuf,c->qb_pos,-1);
        c->qb_pos = 0;
    }
    server.current_client = NULL;
    }
}

命令解析在processMultibulkBuffer中执行,根据Redis的协议,每次解析出来一个命令,如果所读长度不够解析为一个命令,则等待下一次事件循环继续读,代码比较繁琐,就不贴了。
下面来看一下processCommand:

1、 首先查找命令lookupCommand,并验证命令合法性;
2、 处理鉴权;
3、 如果是集群模式,则查找对应key的slot;
4、 如果内存快满了,就根据淘汰策略淘汰部分键值;
5、 在多种情况下判断命令能否执行:需要持久化之前写磁盘错误;主从模式下slave落后master太多;slave设置只读模式;pubsub命令错误;slave与master断连;节点正在忙于执行lua脚本;事务处理;

int processCommand(client *c) {
   
     
	// module模块先忽略
    moduleCallCommandFilters(c);
    //quit命令单独处理
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
   
     
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }
    //查找命令
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
   
     
        //未找到命令
        flagTransaction(c);
        sds args = sdsempty();
        int i;
        for (i=1; i < c->argc && sdslen(args) < 128; i++)
            args = sdscatprintf(args, "%.*s, ", 128-(int)sdslen(args), (char*)c->`argv[i]->`ptr);
        addReplyErrorFormat(c,"unknown command %s, with args beginning with: %s",
            (char*)c->argv[0]->ptr, args);
        sdsfree(args);
        return C_OK;
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < -c->cmd->arity)) {
   
     
        //命令格式不对
        flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return C_OK;
    }
    //检查授权
    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
    {
   
     
        flagTransaction(c);
        addReply(c,shared.noautherr);
        return C_OK;
    }
    //集群模式下的集群跳转
    if (server.cluster_enabled &&
        !(c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_LUA &&
          server.lua_caller->flags & CLIENT_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
          c->cmd->proc != execCommand))
    {
   
     
        int hashslot;
        int error_code;
        clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
                                        &hashslot,&error_code);
        if (n == NULL || n != server.cluster->myself) {
   
     
            if (c->cmd->proc == execCommand) {
   
     
                discardTransaction(c);
            } else {
   
     
                flagTransaction(c);
            }
            clusterRedirectClient(c,n,hashslot,error_code);
            return C_OK;
        }
    }

    //内存管理
    //在lua脚本执行的时候,不要进行内存管理,防止lua脚本的内容与evict的del命令一起混合传播到aof和slave中
    if (server.maxmemory && !server.lua_timedout) {
   
     
        int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
        //freeMemoryIfNeeded中回清空slave的输出缓冲区,如果此时slave网络连接出错
        //同时server.current_client是当前的slave,在往下执行下去就会出错,这里判断之后直接返回
        if (server.current_client == NULL) return C_ERR;
        if (out_of_memory &&
            (c->cmd->flags & CMD_DENYOOM ||
             (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand))) {
   
     
            flagTransaction(c);
            addReply(c, shared.oomerr);
            return C_OK;
        }
    }

    int deny_write_type = writeCommandsDeniedByDiskError();
    //如果redis在持久化时出错,并且该命令需要写,则拒绝
    if (deny_write_type != DISK_ERROR_TYPE_NONE &&
        server.masterhost == NULL &&
        (c->cmd->flags & CMD_WRITE ||
         c->cmd->proc == pingCommand))
    {
   
     
        flagTransaction(c);
        if (deny_write_type == DISK_ERROR_TYPE_RDB)
            addReply(c, shared.bgsaveerr);
        else
            addReplySds(c,
                sdscatprintf(sdsempty(),
                "-MISCONF Errors writing to the AOF file: %s\r\n",
                strerror(server.aof_last_write_errno)));
        return C_OK;
    }

    //对于master,如果要求必须有repl_min_slaves_to_write个slave上线
    //如果是写命令,不满足条件时,则拒绝
    if (server.masterhost == NULL &&
        server.repl_min_slaves_to_write &&
        server.repl_min_slaves_max_lag &&
        c->cmd->flags & CMD_WRITE &&
        server.repl_good_slaves_count < server.repl_min_slaves_to_write)
    {
   
     
        flagTransaction(c);
        addReply(c, shared.noreplicaserr);
        return C_OK;
    }

    //对于只读slave,不接受业务客户端的写命令
    if (server.masterhost && server.repl_slave_ro &&
        !(c->flags & CLIENT_MASTER) &&
        c->cmd->flags & CMD_WRITE)
    {
   
     
        addReply(c, shared.roslaveerr);
        return C_OK;
    }

    //不符合pubsub命令
    if (c->flags & CLIENT_PUBSUB &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {
   
     
        addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
        return C_OK;
    }

    //对于没有上线的slave,只允许接受INFO、SLAVEOF等命令
    if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
        server.repl_serve_stale_data == 0 &&
        !(c->cmd->flags & CMD_STALE))
    {
   
     
        flagTransaction(c);
        addReply(c, shared.masterdownerr);
        return C_OK;
    }

	// loading装
    if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
   
     
        addReply(c, shared.loadingerr);
        return C_OK;
    }

    //在执行lua脚本时,只允许少数几个命令执行,例如AUTH、REPLCONF、SHUTDOWN
    if (server.lua_timedout &&
          c->cmd->proc != authCommand &&
          c->cmd->proc != replconfCommand &&
        !(c->cmd->proc == shutdownCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
        !(c->cmd->proc == scriptCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
    {
   
     
        flagTransaction(c);
        addReply(c, shared.slowscripterr);
        return C_OK;
    }

    /* Exec the command */
    //开始执行命令
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
   
     
        //事务命令
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
   
     
        //单条命令
        call(c,CMD_CALL_FULL);
        c->woff = server.master_repl_offset;
        //命令执行完以后,可能会解除某些客户端的阻塞状态
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }
    return C_OK;
}

如果命令可以执行,最终会调用call函数执行命令,call函数对于每个命令,会调用命令结构体中的命令处理函数进行处理。
那么redis是如何初始化命令的?redis有个全局的命令列表,保存redis中所有支持的命令:

struct redisCommand redisCommandTable[] = {
   
     
    {
   
     "module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
    {
   
     "get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    {
   
     "set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    ...
}

其中redisCommand结构体的各个字段的含义如下:

struct redisCommand {
   
     
    char *name; //命令名称
    redisCommandProc *proc; //命令执行函数
    int arity;  //参数个数
    char *sflags;   //命令字符串形式的标志 
    int flags;    //命令标志
    //从参数列表中获取key,与firstkey、lastkey、keystep配合使用
    redisGetKeysProc *getkeys_proc
    int firstkey; 
    int lastkey;  
    int keystep;  
    long long microseconds, calls;  //命令统计,执行时间,执行次数
};

在server结构体中有个dict *commands; 字段表示redis支持的所有命令:

struct redisServer {
   
     
	...
	dict *commands; 
	...
}

在server启动过程中,会调用initServerConfig函数进行最初的初始化:

void initServerConfig(void) {
   
     
	...
	server.commands = dictCreate(&commandTableDictType,NULL);
	populateCommandTable();
	...
}

populateCommandTable就是逐个将redisCommandTable中的命令添加到server.commands中。以set命令为例,命令的定义为:

struct redisCommand redisCommandTable[] = {
   
     
	...
    {
   
     "set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
	...
}

从redisCommand结构体的定义中看出:set命令的处理函数为setCommand,有3个参数,有两个属性wm:写命令、内存满时需要被拒绝执行,没去取key函数,第一个key的下标为1,最后一个key的下标为1,每个key之间的间隔为1(也就是说set命令只能有一个key)。

最后再来看看call函数

//命令执行
void call(client *c, int flags) {
   
     
    long long dirty;
    ustime_t start, duration;
    int client_old_flags = c->flags;
    struct redisCommand *real_cmd = c->cmd;

    server.fixed_time_expire++;
    //命令传播到monitor中
    if (listLength(server.monitors) &&
        !server.loading &&
        !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
    {
   
     
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
    }

    //清除上一次的命令传播标志,此标志将会在c->cmd->proc()中重新设定
    c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
    redisOpArray prev_also_propagate = server.also_propagate;
    redisOpArrayInit(&server.also_propagate);

    dirty = server.dirty;   //命令执行前统计修改数
    updateCachedTime(0);    //更新时间缓存
    start = server.ustime;
    // 调用命令的proc函数处理命令
    c->cmd->proc(c);
    duration = ustime()-start;
    dirty = server.dirty-dirty; //统计修改量
    if (dirty < 0) dirty = 0;

    //如果server在载入数据,此时不计入slowlog
    if (server.loading && c->flags & CLIENT_LUA)
        flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);

    //lua脚本命令强制传播到slave和aof中
    if (c->flags & CLIENT_LUA && server.lua_caller) {
   
     
        if (c->flags & CLIENT_FORCE_REPL)
            server.lua_caller->flags |= CLIENT_FORCE_REPL;
        if (c->flags & CLIENT_FORCE_AOF)
            server.lua_caller->flags |= CLIENT_FORCE_AOF;
    }

    //slowlog和latency统计
    if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
   
     
        char *latency_event = (c->cmd->flags & CMD_FAST) ?
                              "fast-command" : "command";
        latencyAddSampleIfNeeded(latency_event,duration/1000);
        slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
    }
    //加入命令统计
    if (flags & CMD_CALL_STATS) {
   
     
        real_cmd->microseconds += duration;
        real_cmd->calls++;
    }

    //命令传播
    if (flags & CMD_CALL_PROPAGATE &&
        (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
    {
   
     
        int propagate_flags = PROPAGATE_NONE;

        if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);

        if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
        if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;

        if (c->flags & CLIENT_PREVENT_REPL_PROP ||
            !(flags & CMD_CALL_PROPAGATE_REPL))
                propagate_flags &= ~PROPAGATE_REPL;
        if (c->flags & CLIENT_PREVENT_AOF_PROP ||
            !(flags & CMD_CALL_PROPAGATE_AOF))
                propagate_flags &= ~PROPAGATE_AOF;

        //module命令单独的命令传播
        if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
    }

    //恢复客户端的传播状态
    c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
    c->flags |= client_old_flags &
        (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);

    //传播额外的命令
    if (server.also_propagate.numops) {
   
     
        int j;
        redisOp *rop;

        if (flags & CMD_CALL_PROPAGATE) {
   
     
            for (j = 0; j < server.also_propagate.numops; j++) {
   
     
                rop = &server.also_propagate.ops[j];
                int target = rop->target;
                /* Whatever the command wish is, we honor the call() flags. */
                if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
                if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
                if (target)
                    propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
            }
        }
        redisOpArrayFree(&server.also_propagate);
    }
    server.also_propagate = prev_also_propagate;
    server.fixed_time_expire--;
    server.stat_numcommands++;
}

在call中,除了命令执行外,最重要的就是命令传播到slaves和aof中,在propagate中实现,在后面持久化和主从传播时再介绍这部分。

命令的回复

redis在命令处理完成之后,不会立即将回复发送回客户端,而是采取“异步回复”。在每个client中都有一个输出缓冲,这个缓冲由两部分组成:

typedef struct client {
   
     
	...
	list *reply;            /* List of reply objects to send to the client. */
    unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
    ...
    int bufpos;
    char buf[PROTO_REPLY_CHUNK_BYTES];
} client

当redis处理完命令之后,会先将回复写入buf中,buf如果不够,再将回复写入relpy中,以addReply为例:redis将客户端的回复封装成一个对象,并把对象写入输出缓冲中,_addReplyToBuffer负责向buf中写入回复,_addReplyStringToList负责向reply中写入回复,在_addReplyStringToList中,还会检查客户端的回复缓冲区是否达到了缓冲区大小的软硬限制。

void addReply(client *c, robj *obj) {
   
     
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
   
     
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
   
     
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
            _addReplyStringToList(c,buf,len);
    } else {
   
     
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

addReply先调用prepareClientToWrite准备向客户端缓冲写入回复:

int prepareClientToWrite(client *c) {
   
     
    if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;

    if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;

    //master客户端一般不需要回复
    if ((c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;

    if (c->fd <= 0) return C_ERR; /* Fake client for AOF loading. */

    //如果客户端输出缓冲区中还有回复
    if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c);

    return C_OK;
}

prepareClientToWrite中最后会用clientHasPendingReplies判断客户端之前的输出缓冲是否已经发送完,如果没有,则说明客户端还在server.clients_pending_write中,就不需要调用clientInstallWriteHandler,clientInstallWriteHandler主要就是将客户端添加到server.clients_pending_write中。
server.clients_pending_write中的客户端何时处理呢,在redis的每次eventloop循环之前,都会调用beforeSleep进行一些循环之前的处理,其中有个handleClientsWithPendingWrites函数负责向server.clients_pending_write中的客户端发送输出缓冲:

void beforeSleep(struct aeEventLoop *eventLoop) {
   
     
	...
	/* Handle writes with pending output buffers. */
    handleClientsWithPendingWrites();
	...
}

int handleClientsWithPendingWrites(void) {
   
     
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);

	//逐个处理server.clients_pending_write中的客户端
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
   
     
    	//先清除标志
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);
        //PROTECTED模式下不进行回复
        if (c->flags & CLIENT_PROTECTED) continue;

		//发送回复
        if (writeToClient(c->fd,c,0) == C_ERR) continue;

		//如果没有发送完,则放到ServCron中继续发送,并且设置AE_BARRIER
        if (clientHasPendingReplies(c)) {
   
     
            int ae_flags = AE_WRITABLE;

            if (server.aof_state == AOF_ON &&
                server.aof_fsync == AOF_FSYNC_ALWAYS)
            {
   
     
                ae_flags |= AE_BARRIER;
            }
            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                sendReplyToClient, c) == AE_ERR)
            {
   
     
                    freeClientAsync(c);
            }
        }
    }
    return processed;
}

writeToClient负责向客户端的套接字中写入回复,为了避免长时间的阻塞,每次writeToClient都有一个最多写入量,如果客户端缓冲超过这个量,那么将会把剩下的放到ServCron中发送,并且这里设置了AE_BARRIER,AE_BARRIER标志表示在eventloop循环中,先处理写事件,在处理读时间,表明此时客户端中的缓冲需要被优先处理。

客户端销毁

客户端通过freeClient函数销毁,该函数主要在以下情况下调用:
1、 客户端的个数超过了redis设置的最大客户端个数阈值;
2、 保护模式下不允许其他机器客户端访问;
3、 需要断开从库时,对从库客户端销毁;
4、 网络错误,例如读写客户端套接字时发生错误,客户端关闭等;
5、 客户端的输出缓冲区达到了redis设置的限制;
6、 主从同步时,在数据同步握手阶段发生错误;

void freeClient(client *c) {
   
     
    listNode *ln;
    //保护模式下的异步销毁
    if (c->flags & CLIENT_PROTECTED) {
   
     
        freeClientAsync(c);
        return;
    }
    //slave销毁master,缓存master
    if (server.master && c->flags & CLIENT_MASTER) {
   
     
        serverLog(LL_WARNING,"Connection with master lost.");
        if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
                          CLIENT_CLOSE_ASAP|
                          CLIENT_BLOCKED)))
        {
   
     
            replicationCacheMaster(c);
            return;
        }
    }
    //slave断开
    if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
   
     
        serverLog(LL_WARNING,"Connection with replica %s lost.",
            replicationGetSlaveName(c));
    }
    //释放输入缓冲区
    sdsfree(c->querybuf);
    sdsfree(c->pending_querybuf);
    c->querybuf = NULL;
    //改变阻塞状态
    if (c->flags & CLIENT_BLOCKED) unblockClient(c);
    dictRelease(c->bpop.keys);
    //释放watch的键
    unwatchAllKeys(c);
    listRelease(c->watched_keys);
    //释放订阅发布
    pubsubUnsubscribeAllChannels(c,0);
    pubsubUnsubscribeAllPatterns(c,0);
    dictRelease(c->pubsub_channels);
    listRelease(c->pubsub_patterns);
    //释放输出缓冲
    listRelease(c->reply);
    //释放当前的命令
    freeClientArgv(c);
    //释放客户端的网络连接
    unlinkClient(c);
    //当客户端处于主从同步状态
    if (c->flags & CLIENT_SLAVE) {
   
     
        //正在接受rdb,则释放连接和接收到的rdb
        if (c->replstate == SLAVE_STATE_SEND_BULK) {
   
     
            if (c->repldbfd != -1) close(c->repldbfd);
            if (c->replpreamble) sdsfree(c->replpreamble);
        }
        //从server的master或者slave列表中移除该客户端
        list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves;
        ln = listSearchKey(l,c);
        serverAssert(ln != NULL);
        listDelNode(l,ln);
        //更新server相关属性
        if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
            server.repl_no_slaves_since = server.unixtime;
        refreshGoodSlavesCount();
    }
    //处理master失联
    if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
    //客户端立即关闭
    if (c->flags & CLIENT_CLOSE_ASAP) {
   
     
        ln = listSearchKey(server.clients_to_close,c);
        serverAssert(ln != NULL);
        listDelNode(server.clients_to_close,ln);
    }
    if (c->name) decrRefCount(c->name);
    zfree(c->argv);
    //释放事务
    freeClientMultiState(c);
    sdsfree(c->peerid);
    zfree(c);
}

至此,从客户端与服务端建立连接,接着发送命令,到server处理命令,在将命令的处理结果发送回客户端,最后客户端销毁的整个过程已经完成。