Redis 6.0 新特性之 Threaded I/O

/ 0评 / 0

Redis can now optionally use threads to handle I/O, allowing to serve 2 times as much operations per second in a single instance when pipelining cannot be used.

目前对于单线程 Redis 来说,性能瓶颈主要在于网络的 IO 消耗,优化主要有两个方向:

  1. 提高网络 IO 性能,典型的实现像使用 DPDK 来替代内核网络栈的方式
  2. 使用多线程充分利用多核,典型的实现像 Memcached

协议栈优化的这种方式跟 Redis 关系不大,支持多线程是一种最有效最便捷的操作方式。所以 Redis 从 6.0 版本开始引入了 Threaded I/O,目的是为了提升执行命令前后的网络 I/O 性能。

但跟 Memcached 这种从 IO 处理到数据访问都是多线程的实现模式有些差异。Redis 的多线程部分只是用来处理网络数据的读写和协议解析,执行命令仍然是单线程。之所以这么设计是不想因为多线程而变得复杂,需要去控制 key、lua、事务,LPUSH/LPOP 等等的并发问题。

事件循环处理器 —— AE

Redis 使用了一个称为“A simple event-driven programming library”的自制异步事件库(简称“AE”),是 Redis 处理流程的核心。下面会对大体流程做一个简略的介绍,类似于背景,方便理解后面的 Threaded I/O 实现。

initServer

// server.c
int main(int argc, char **argv) {
    ...
    ACLInit(); /* The ACL subsystem must be initialized ASAP because the
                  basic networking code and client creation depends on it. */
    ...
    initServer();
    ...
    aeMain(server.el);
    ...
}

void initServer(void) {
    ...
    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. 
     * 内部调用 aeCreateFileEvent() */
    if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
        serverPanic("Unrecoverable error creating TCP socket accept handler.");
    }
    
    ...
    
    // 设置 beforesleep 和 aftersleep 回调函数
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
    ...
}

acceptTcpHandler

// networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    ...
    while(max--) {
        // 接受客户端的请求(tcp链接),返回fd
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        ...
        // 处理客户端的请求
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    ...
    /* Create connection and client */
    if ((c = createClient(conn)) == NULL) {
        ...
        return;
    }
    ...
}

client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));
    if (conn) {
        ...
        // 内部调用 aeCreateFileEvent()
        connSetReadHandler(conn, readQueryFromClient);
    }
    ...
}

aeMain

// ae.c
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    ...
    if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        ...
        
        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. 
         * IO 多路复用,等待文件事件就绪 */
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        // 遍历文件事件
        for (j = 0; j < numevents; j++) {
            ...
            // 反转标识符
            int invert = fe->mask & AE_BARRIER;
            // 如果invert为false,先读
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }
            // 写
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            // 如果invert为true,后读
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            
            processed++;
        }
    }
    // 处理时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

旧逻辑

void beforeSleep(struct aeEventLoop *eventLoop) {
    ...
    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWrites();
    ...
}

int handleClientsWithPendingWrites(void) {
    ...
    
    // 遍历 clients_pending_write
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        ...
        
        // 先执行 writeToClient()
        if (writeToClient(c->fd,c,0) == C_ERR) continue;

        // 还有数据要写(例如缓冲区已经写满了)
        if (clientHasPendingReplies(c)) {
            ...
            // 绑定 fileEvent 为 sendReplyToClient
            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                sendReplyToClient, c) == AE_ERR) {
                    freeClientAsync(c);
            }
        }
    }
    return processed;
}
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    ...
    nread = read(fd, c->querybuf+qblen, readlen);
    ...
    } else if (c->flags & CLIENT_MASTER) {
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }
    ...
    // 内部执行 processCommand()
    processInputBufferAndReplicate(c);
}
int processCommand(client *c) {
    // 一系列复杂的处理过程
    ...
    
    /* Exec the command */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        ...
    } else {
        call(c,CMD_CALL_FULL);
        ...
    }
    return C_OK;
}
void addReply(client *c, robj *obj) {
    /* 内部调用 clientInstallWriteHandler(),将 client 加入到
     clients_pending_write */
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        // 小对象写 buf
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            // 大对象写 reply
            _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        ...
    } else {
        ...
    }
}

新逻辑

void beforeSleep(struct aeEventLoop *eventLoop) {
    ...
    /* We should handle pending reads clients ASAP after event loop. */
    handleClientsWithPendingReadsUsingThreads();
    ...
    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWritesUsingThreads();
    ...
}

源码分析

有了上面的整体印象,接下来会对 Threaded I/O 相关的源码进行分析

readQueryFromClient

// networking.c
void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    ...
    
    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
    if (postponeClientRead(c)) return;
    ...
    
    // 读请求数据
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    ...
    } else if (c->flags & CLIENT_MASTER) {
        /* Append the query buffer to the pending (not applied) buffer
         * of the master. We'll use this buffer later in order to have a
         * copy of the string applied by the last command executed. */
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }
    ...
    
    processInputBuffer(c);
}

/* Return 1 if we want to handle the client read later using threaded I/O.
 * This is called by the readable handler of the event loop.
 * As a side effect of calling this function the client is put in the
 * pending read clients and flagged as such. */
int postponeClientRead(client *c) {
    if (server.io_threads_active &&
        server.io_threads_do_reads && // 是否开启多线程io
        !ProcessingEventsWhileBlocked &&
        // client 不是这四种状态之一
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) 
    {
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}

void processInputBuffer(client *c) {
    while(c->qb_pos < sdslen(c->querybuf)) {
        // 循环处理 queryBuf 中的数据,解析命令
        ...
        
        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* If we are in the context of an I/O thread, we can't really
             * execute the command here. All we can do is to flag the client
             * as one that needs to process the command. 
             * 如果是 io 多路复用场景,只修改标识符,不会执行命令 */
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }

            /* 内部调用 processCommand() 执行命令,命令执行结束后调用 addReply(),
             * 完成 clients_pending_write 的插入和 buf 的写入 */
            if (processCommandAndResetClient(c) == C_ERR) {
                return;
            }
        }
    }
    ...
}

handleClientsWithPendingReads(Writes)UsingThreads

int handleClientsWithPendingReadsUsingThreads(void) {
    ...
    
    // 将 clients_pending_read 中的 client 分配给每个 io_threads_list
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    // rr 取余
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
    ...
    
    // 主线程处理分配给自己的 client
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);
    
    // 循环等待所有的 io 线程结束工作
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }
    
    // 遍历
    while(listLength(server.clients_pending_read)) {
        ...
        processInputBuffer(c);
        ...
    }
}

IOThreadMain

void *IOThreadMain(void *myid) {
    ...
    while(1) {
        /* Wait for start */
        for (int j = 0; j < 1000000; j++) {
            if (getIOPendingCount(id) != 0) break;
        }

        ...

        // 遍历自己的 io_threads_list
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        setIOPendingCount(id, 0);
    }
}

参考文档

发表评论

邮箱地址不会被公开。 必填项已用*标注