04、Redis 源码解析 - Redis 事件驱动框架

作为一个服务端节点,首先要能够处理客户端的连接请求,然后从套接字中读取命令并解析处理,这部分由Redis的事件驱动框架处理,事件驱动框架分为两部分:IO事件和时间事件,分别负责处理客户端的网络请求和Redis自身的周期性操作。
IO事件部分是典型的Reactor网络模型,在单线程下,非阻塞IO+IO多路复用的基于事件驱动基本就可以应对海量的连接、读写,同时在集群模式下,尤其是大集群,还用来处理集群间的消息传递。
时间事件与IO事件集成到一起,通过设置epoll_wait的阻塞等待时间来触发。

事件驱动启动

Redis 源码解析 - Redis 启动流程中,与事件驱动框架有关的部分有两处:

1、 事件驱动框架的初始化以及套接字监听、时间事件注册:;

void initServer(void) {
   
     
	...
	// 事件驱动初始化
	server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    if (server.el == NULL) {
   
     
        serverLog(LL_WARNING,
            "Failed creating the event loop. Error message: '%s'",
            strerror(errno));
        exit(1);
    }
	...
	// 时间事件注册:serverCron是要运行的周期性事件
	if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
   
     
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
    // 套接字监听事件注册:server.ipfd[j]是被监听的套接字
	for (j = 0; j < server.ipfd_count; j++) {
   
     
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
   
     
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");

}

其中server.el是redisServer中的aeEventLoop结构

struct redisServer {
   
     
	...
	aeEventLoop *el;
	...
}

aeEventLoop

aeEventLoop是Reactor模型的具体抽象,将网络事件和时间时间统一到一起处理。

  • events以数组的方式管理注册的IO事件,个数为setsize,其中events的下标就是具体的套接字fd。
  • fired是发生读写事件的网络事件数组。
  • timeEventHead以链表的形式管理已经注册的时间事件。
typedef struct aeEventLoop {
   
     
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    // 事件循环停止标志
    int stop;
    // 回调函数的执行参数
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;

aeFileEvent用来管理要注册的IO事件

  • mask是事件类型,包括E_READABLE、AE_WRITABLE和AE_BARRIER三种类型事件
  • rfileProc和wfileProce分别是指向AE_READABLE和AE_WRITABLE的处理函数
  • clientData是对应处理函数的参数
typedef struct aeFileEvent {
   
     
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

aeFiredEvent用来管理已经发生的IO事件

  • fd是对于事件的套接字句柄
  • mast是对于发生的事件类型
typedef struct aeFiredEvent {
   
     
    int fd;
    int mask;
} aeFiredEvent;

timeEventHead是注册的时间事件列表

  • 每个时间事件都有一个事件id,timeEventNextId是下一个要注册的时间事件id。
  • when_sec、when_ms是下次事件的发生时间
  • timeProc是时间事件的处理函数
  • finalizerProc是时间事件注册时的处理函数
typedef struct aeTimeEvent {
   
     
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
} aeTimeEvent;

beforesleep和aftersleep是每次事件循坏之前和之后需要执行的函数,apidata封装来具体的IO多路复用的系统调用,linux主要有select、poll、epoll,在Redis代码文件中分别对应ae.select.cc、ae_evport.cc、ae_epoll.cc。在Redis代码文件中还存在ae.kqueue.cc,对应着MacOs下的事件驱动。
下面以epoll为例分析aeEventLoop。

事件驱动初始化

aeCreateEventLoop主要对aeEventLoop结构中的各个字段设置,分配

aeEventLoop *aeCreateEventLoop(int setsize) {
   
     
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
   
     
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

其中aeApiCreate在epoll中初始化如下:
aeEventLoop中的apidata在epoll中表示为aeApiState,id为epoll的根id,events表示接受事件循环epoll_wait返回的触发读写的网络事件。

typedef struct aeApiState {
   
     
    int epfd;
    struct epoll_event *events;
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
   
     
    aeApiState *state = zmalloc(sizeof(aeApiState));
	...
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
	...
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    ...
	eventLoop->apidata = state;
    return 0;
}

事件注册

事件注册分为两类:

  • IO事件注册与删除:aeCreateFileEvent,aeDeleteFileEvent
  • 时间事件注册与删除:aeCreateTimeEvent,aeDeleteTimeEvent

IO事件的注册通过aeApiAddEvent函数将套接字及其事件处理函数注册到epoll中,aeEventLoop中的读写标志AE_READABLE、AE_WRITABLE映射为epoll中的EPOLLIN、EPOLLOUT。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
   
     
    if (fd >= eventLoop->setsize) {
   
     
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
   
     
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {
   
     0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

在aeApiAddEvent中,在进行事件注册时,设置:ee.data.fd = fd;大部分的reactor模型的epoll编程都大同小异,最主要的差别就是对epoll_event.data这个union的设置,对于较复杂的网络模型,可以设置data=ptr,这样事件的数据更加丰富,灵活性大大增加。

typedef union epoll_data
{
   
     
  void        *ptr;
  int          fd;
  __uint32_t   u32;
  __uint64_t   u64;
} epoll_data_t;

struct epoll_event {
   
     
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

IO事件的删除或者更改事件通过aeApiDelEvent更改epoll中的fd事件监听类型

void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
   
     
    if (fd >= eventLoop->setsize) return;
    aeFileEvent *fe = &eventLoop->events[fd];
    if (fe->mask == AE_NONE) return;

    /* We want to always remove AE_BARRIER if set when AE_WRITABLE
     * is removed. */
    if (mask & AE_WRITABLE) mask |= AE_BARRIER;

    aeApiDelEvent(eventLoop, fd, mask);
    fe->mask = fe->mask & (~mask);
    if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
   
     
        /* Update the max fd */
        int j;

        for (j = eventLoop->maxfd-1; j >= 0; j--)
            if (eventLoop->events[j].mask != AE_NONE) break;
        eventLoop->maxfd = j;
    }
}

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
   
     
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {
   
     0}; /* avoid valgrind warning */
    int mask = eventLoop->events[fd].mask & (~delmask);

    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (mask != AE_NONE) {
   
     
        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    } else {
   
     
        /* Note, Kernel < 2.6.9 requires a non null event pointer even for
         * EPOLL_CTL_DEL. */
        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
    }
}

时间事件注册是初始化aeTimeEvent并设置该事件下一次发生的时间,并把aeTimeEvent设置到时间事件列表中。删除则直接从时间事件列表中删除即可。

事件循环

事件循环通过aeMain函数实现,在Redis 源码解析 - Redis 启动流程中,启动一个Redis节点的最后一步就是启动aeMain函数阻塞等待事件发生,并处理。函数逻辑很简单,如果有前置回调函数beforesleep,则处理前置回调,否则直接进入到aeProcessEvents中,阻塞等待事件发生。

void aeMain(aeEventLoop *eventLoop) {
   
     
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
   
     
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

下面注重来看下aeProcessEvents,第一个参数是要处理的事件驱动框架,第二个参数是要处理的事件类型,在aeMain中表示处理包括IO事件和时间事件在内的所有事件以及回调后置函数aftersleep。

int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
   
     
	...
	//确定epoll_wait的超时时间
    //1、如果设置AE_DONT_WAIT标志,则需要立即返回,设置超时时间为0
    //2、如果需要处理时间事件,则设置超时时间为距离现在最近的时间事件的时间
	if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
   
     
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
   
     
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            if (ms > 0) {
   
     
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
   
     
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
   
     
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
   
     
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
   
     
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }
        numevents = aeApiPoll(eventLoop, tvp);
        // 如果设置了后置函数aftersleep,则调用
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);
        //对发生的事件进行处理
        for (j = 0; j < numevents; j++) {
   
     
        	// 获取发生事件的信息
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0;
            int invert = fe->mask & AE_BARRIER;
            // 根据事件类型,调用对应的函数进行处理:
            // AE_READABLE类型调用rfileProc
            // AE_WRITABLE类型调用wfileProc
            if (!invert && fe->mask & mask & AE_READABLE) {
   
     
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
            if (fe->mask & mask & AE_WRITABLE) {
   
     
                if (!fired || fe->wfileProc != fe->rfileProc) {
   
     
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            if (invert && fe->mask & mask & AE_READABLE) {
   
     
                if (!fired || fe->wfileProc != fe->rfileProc) {
   
     
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            processed++;
		}
	}
	//最后处理时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed;    
}

该函数的主要步骤如下:

1、 如果需要处理时间事件,则通过aeSearchNearestTimer函数寻找距离现在最近的一个时间事件,得到其触发时间aeSearchNearestTimer逻辑很简单,遍历时间事件列表,找到最小值;
2、 根据第一步中找到的最小触发时间,设置阻塞时间(如果没有时间事件,则设置阻塞时间为无限大),然后调用aeApiPoll等待事件发生或者阻塞时间超时;
3、 如果设置后置函数aftersleep,则调用;
4、 处理发生的IO事件,根据发生的事件类型,如果是AE_READABLE类型调用rfileProc,如果是AE_WRITABLE类型调用wfileProc一般来说,先处理AE_READABLE类型事件,该类事件一般为客户端连接或者命令,然后处理AE_WRITABLE类型事件向客户端发送响应对于客户端的回复,一般在beforesleep中就会执行完成),但是如果1、如果客户端回复buf中较多,无法在beforesleep中全部发送,同时aof开启且持久化方式为always,那么需要立即发送回复;2、在集群模式下,调用clusterSendMessage函数发送消息时,也需要将消息立即发送到套接字中;
5、 最后通过processTimeEvents处理时间事件;

事件循坏时,epoll把发生读写的事件注册到aeEventLoop的fired数组中,其中eventLoop->fired[j].fd = e->data.fd;确定了事件的套接字。

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
   
     
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
   
     
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
   
     
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

最后是时间事件的处理processTimeEvents

static int processTimeEvents(aeEventLoop *eventLoop) {
   
     
	int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    time_t now = time(NULL);
    // 如果时钟抖动,则对aeEventLoop中的所有时间事件全部执行
    //(根据作者的说法,时间事件提前执行比延后执行造成的影响小)
    if (now < eventLoop->lastTime) {
   
     
        te = eventLoop->timeEventHead;
        while(te) {
   
     
            te->when_sec = 0;
            te = te->next;
        }
    }
    eventLoop->lastTime = now;
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    while(te) {
   
     
        long now_sec, now_ms;
        long long id;

        // 清除删除的时间事件
        if (te->id == AE_DELETED_EVENT_ID) {
   
     
            aeTimeEvent *next = te->next;
            if (te->prev)
                te->prev->next = te->next;
            else
                eventLoop->timeEventHead = te->next;
            if (te->next)
                te->next->prev = te->prev;
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
   
     
            int retval;

            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            // 通过返回值判断是否是周期行时间事件
            if (retval != AE_NOMORE) {
   
     
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
   
     
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        te = te->next;
    }
    return processed;
}