Redis can now optionally use threads to handle I/O, allowing to serve 2 times as much operations per second in a single instance when pipelining cannot be used.
目前对于单线程 Redis 来说,性能瓶颈主要在于网络的 IO 消耗,优化主要有两个方向:
- 提高网络 IO 性能,典型的实现像使用 DPDK 来替代内核网络栈的方式
- 使用多线程充分利用多核,典型的实现像 Memcached
协议栈优化的这种方式跟 Redis 关系不大,支持多线程是一种最有效最便捷的操作方式。所以 Redis 从 6.0 版本开始引入了 Threaded I/O,目的是为了提升执行命令前后的网络 I/O 性能。
但跟 Memcached 这种从 IO 处理到数据访问都是多线程的实现模式有些差异。Redis 的多线程部分只是用来处理网络数据的读写和协议解析,执行命令仍然是单线程。之所以这么设计是不想因为多线程而变得复杂,需要去控制 key、lua、事务,LPUSH/LPOP 等等的并发问题。
事件循环处理器 —— AE
Redis 使用了一个称为“A simple event-driven programming library”的自制异步事件库(简称“AE”),是 Redis 处理流程的核心。下面会对大体流程做一个简略的介绍,类似于背景,方便理解后面的 Threaded I/O 实现。
initServer
- 遍历 bind 的地址(ipfd),设置 TCP 连接事件的处理句柄为
acceptTcpHandler
- 设置 beforesleep 和 aftersleep 回调函数
// server.c int main(int argc, char **argv) { ... ACLInit(); /* The ACL subsystem must be initialized ASAP because the basic networking code and client creation depends on it. */ ... initServer(); ... aeMain(server.el); ... } void initServer(void) { ... /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. * 内部调用 aeCreateFileEvent() */ if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) { serverPanic("Unrecoverable error creating TCP socket accept handler."); } ... // 设置 beforesleep 和 aftersleep 回调函数 aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep); ... }
acceptTcpHandler
- 连接 Redis 时,TCP 连接事件被触发,
acceptTcpHandler
事件句柄被调用 - 在
acceptCommonHandler
中,创建 client 对象,并将 IO 读事件的处理句柄设置为readQueryFromClient
// networking.c void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { ... while(max--) { // 接受客户端的请求(tcp链接),返回fd cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); ... // 处理客户端的请求 acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); } } static void acceptCommonHandler(connection *conn, int flags, char *ip) { ... /* Create connection and client */ if ((c = createClient(conn)) == NULL) { ... return; } ... } client *createClient(connection *conn) { client *c = zmalloc(sizeof(client)); if (conn) { ... // 内部调用 aeCreateFileEvent() connSetReadHandler(conn, readQueryFromClient); } ... }
aeMain
- 循环执行 aeProcessEvents
- beforesleep -> aeApiPoll -> aftersleep
- 遍历就绪的文件事件,执行已绑定的读写回调函数(通常为先读后写)
- io读事件:readQueryFromClient
- io写事件:sendReplyToClient
// ae.c void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { aeProcessEvents(eventLoop, AE_ALL_EVENTS| AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP); } } int aeProcessEvents(aeEventLoop *eventLoop, int flags) { ... if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { ... if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) eventLoop->beforesleep(eventLoop); /* Call the multiplexing API, will return only on timeout or when * some event fires. * IO 多路复用,等待文件事件就绪 */ numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); // 遍历文件事件 for (j = 0; j < numevents; j++) { ... // 反转标识符 int invert = fe->mask & AE_BARRIER; // 如果invert为false,先读 if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ } // 写 if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } // 如果invert为true,后读 if (invert) { fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } // 处理时间事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
旧逻辑
- Client 中 queryBuf 为请求缓存,buf 为结果缓存(如果是大对象,则写入 reply)
- aeProcessEvents() 流程
- [beforeSleep] 检查 server.clients_pending_write(前一次的待回复列表),并响应客户端
void beforeSleep(struct aeEventLoop *eventLoop) { ... /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(); ... } int handleClientsWithPendingWrites(void) { ... // 遍历 clients_pending_write listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { ... // 先执行 writeToClient() if (writeToClient(c->fd,c,0) == C_ERR) continue; // 还有数据要写(例如缓冲区已经写满了) if (clientHasPendingReplies(c)) { ... // 绑定 fileEvent 为 sendReplyToClient if (aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) { freeClientAsync(c); } } } return processed; }
- IO多路复用,等待文件事件
- 遍历文件事件,若读(写)就绪,执行对应的回调函数。
- [sendReplyToClient] 内部直接调用了 writeToClient,区别只是参数 handler_installed 不同(前面 handleClientsWithPendingWrites 中是 0,这里是 1)
- [readQueryFromClient] 将连接中的数据读取进 client.querybuf 消息缓冲区
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { ... nread = read(fd, c->querybuf+qblen, readlen); ... } else if (c->flags & CLIENT_MASTER) { c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } ... // 内部执行 processCommand() processInputBufferAndReplicate(c); }
- [processCommand] call() 执行 redis 命令
int processCommand(client *c) { // 一系列复杂的处理过程 ... /* Exec the command */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { ... } else { call(c,CMD_CALL_FULL); ... } return C_OK; }
- 命令函数内部会调用 addReply(),将结果放入 client.buf 中,并将 client 加入到 clients_pending_write 中
void addReply(client *c, robj *obj) { /* 内部调用 clientInstallWriteHandler(),将 client 加入到 clients_pending_write */ if (prepareClientToWrite(c) != C_OK) return; if (sdsEncodedObject(obj)) { // 小对象写 buf if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) // 大对象写 reply _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr)); } else if (obj->encoding == OBJ_ENCODING_INT) { ... } else { ... } }
新逻辑
- io_threads_list:长度为线程数的列表,每一个元素又是一个新的列表,表示这一线程需要处理的 client 对象
- Redis 的 Threaded I/O 瞬时只能处于读或写的状态,不能部分线程读,部分写
- aeProcessEvents() 流程
- beforeSleep
- [handleClientsWithPendingReadsUsingThreads] 检查 server.clients_pending_read(前一次的待读取列表),并从客户端读取数据
- [handleClientsWithPendingWritesUsingThreads] 检查 server.clients_pending_write(前一次的待回复列表),并回复客户端
- beforeSleep
void beforeSleep(struct aeEventLoop *eventLoop) { ... /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); ... /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); ... }
- IO多路复用,等待文件事件
- 『延迟读』遍历文件事件,若读or写就绪,执行对应的回调函数
- [readQueryFromClient 延迟读] 遍历文件事件,将可以线程化读的 client 对象添加到 server.clients_pending_read 列表中
- [sendReplyToClient] 内部直接调用了 writeToClient,区别只是参数 handler_installed 不同(sendReplyToClient 中调用 writeToClient,参数为 1)
源码分析
有了上面的整体印象,接下来会对 Threaded I/O 相关的源码进行分析
readQueryFromClient
- 向 redis 发送命令时,IO 事件触发,
readQueryFromClient
句柄被调用 - 『启用了 threaded I/O 且满足条件』执行
postponeClientRead
,将 client 设置为CLIENT_PENDING_READ 状态,并将 client 添加到 server.clients_pending_read 链表头部 - 『不满足条件』读数据到 client 的 queryBuf 并进行处理
- 未启用 threaded I/O
- IO 线程,client 状态为 CLIENT_PENDING_READ
- 『未启用 threaded I/O』执行命令并将结果写入到 client 的 buf 中,同时将 client 加入到 clients_pending_write
// networking.c void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); ... /* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */ if (postponeClientRead(c)) return; ... // 读请求数据 nread = connRead(c->conn, c->querybuf+qblen, readlen); ... } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a * copy of the string applied by the last command executed. */ c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } ... processInputBuffer(c); } /* Return 1 if we want to handle the client read later using threaded I/O. * This is called by the readable handler of the event loop. * As a side effect of calling this function the client is put in the * pending read clients and flagged as such. */ int postponeClientRead(client *c) { if (server.io_threads_active && server.io_threads_do_reads && // 是否开启多线程io !ProcessingEventsWhileBlocked && // client 不是这四种状态之一 !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) { c->flags |= CLIENT_PENDING_READ; listAddNodeHead(server.clients_pending_read,c); return 1; } else { return 0; } } void processInputBuffer(client *c) { while(c->qb_pos < sdslen(c->querybuf)) { // 循环处理 queryBuf 中的数据,解析命令 ... /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { resetClient(c); } else { /* If we are in the context of an I/O thread, we can't really * execute the command here. All we can do is to flag the client * as one that needs to process the command. * 如果是 io 多路复用场景,只修改标识符,不会执行命令 */ if (c->flags & CLIENT_PENDING_READ) { c->flags |= CLIENT_PENDING_COMMAND; break; } /* 内部调用 processCommand() 执行命令,命令执行结束后调用 addReply(), * 完成 clients_pending_write 的插入和 buf 的写入 */ if (processCommandAndResetClient(c) == C_ERR) { return; } } } ... }
handleClientsWithPendingReads(Writes)UsingThreads
- 如果未开启 threaded i/o,直接返回
- 以 Round-Robin 的形式(取余)将 clients_pending_read 中的 client 分配给各线程,存储在对应线程的 client_list 中
- 主线程处理分配给自己的 client(io_threads_list[0]),然后等待所有 io 线程执行完各自的工作
- 主线程和 io 线程会再次调用 readQueryFromClient,但因为标识符的不同,会执行不同的逻辑
- 读取并解析客户端的数据,但不会执行命令
- 遍历 clients_pending_read 中的 client,串行地执行命令,将结果写入 buf,并将 client 加入到 clients_pending_write
int handleClientsWithPendingReadsUsingThreads(void) { ... // 将 clients_pending_read 中的 client 分配给每个 io_threads_list listIter li; listNode *ln; listRewind(server.clients_pending_read,&li); int item_id = 0; // rr 取余 while((ln = listNext(&li))) { client *c = listNodeValue(ln); int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; } ... // 主线程处理分配给自己的 client listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); readQueryFromClient(c->conn); } listEmpty(io_threads_list[0]); // 循环等待所有的 io 线程结束工作 while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j++) pending += getIOPendingCount(j); if (pending == 0) break; } // 遍历 while(listLength(server.clients_pending_read)) { ... processInputBuffer(c); ... } }
- handleClientsWithPendingWritesUsingThreads
- 如果未开启 threaded i/o,执行原来的 handleClientsWithPendingWrites() 函数
- 整体逻辑和『读』差不多
- 遍历 clients_pending_write 中的 client,如果还有未写完的数据(超过缓冲区大小,正常情况下 io 线程就写完了),绑定写就绪回调函数 sendReplyToClient(),在遍历文件事件的时候会被调用
IOThreadMain
- 通过 getIOPendingCount() 获取自己的 io_threads_list 链表的长度
- 遍历 io_threads_list 链表,如果当前为写状态,执行 writeToClient();如果是读状态,执行 readQueryFromClient()
- 读:将请求信息保存到 client 的 queryBuf
- 写:将响应数据返回
void *IOThreadMain(void *myid) { ... while(1) { /* Wait for start */ for (int j = 0; j < 1000000; j++) { if (getIOPendingCount(id) != 0) break; } ... // 遍历自己的 io_threads_list listIter li; listNode *ln; listRewind(io_threads_list[id],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { writeToClient(c,0); } else if (io_threads_op == IO_THREADS_OP_READ) { readQueryFromClient(c->conn); } else { serverPanic("io_threads_op value is unknown"); } } listEmpty(io_threads_list[id]); setIOPendingCount(id, 0); } }
参考文档
- 源码:Redis 5.0.13 & Redis 6.2.5
- https://www.cyhone.com/articles/analysis-of-redis-ae/ Redis 事件循环器 (AE) 实现剖析
- https://www.jianshu.com/p/6188becd2cea Redis命令处理过程分析
- https://jiekun.dev/posts/redis-tio-implementation/ Redis 6.0新Feature实现原理——Threaded I/O
- https://keys961.github.io/2020/04/16/源码阅读-Redis-6.0-多线程IO/ 源码阅读-Redis 6.0: 多线程I/O