Redis 6.0 新特性之 ACL

/ 0评 / 0

ACL support, you can define users that can run only certain commands and/or can only access only certain keys patterns.

目前的 Redis(5及以下版本),没有用户权限管理这个概念,只有一个 AUTH 密码验证功能,基本上能够接入的用户就是 root 用户。这也就意味着所有的客户端相当于是使用同一个账户来操作 redis 的。

ACL 就是为了避免接入的用户进行危险命令的操作开发的功能,这类命令如 FLUSHALL、DEBUG 等。在以往这一功能通过命令重命名(RENAME)来完成。

ACL 提供三个层面的权限控制

操作命令

1. 获取 ACL 列表:

> ACL LIST
1) "user alice off -@all"
2) "user default on nopass ~* +@all"
参数详细说明
user用户
default表示默认用户名,或则自己定义的用户名
on表示是否启用该用户,默认为off(禁用)
+@表示用户的权限,“+”表示授权权限,有权限操作或访问,“-”表示还是没有权限; @为权限分类,可以通过 ACL CAT 查询支持的分类。+@all 表示所有权限,nocommands 表示不给与任何命令的操作权限
~*表示可以访问的Key(正则匹配)
表示用户密码,nopass表示不需要密码

2. 修改 ACL 信息:

> ACL SETUSER alice on >p1pp0 ~cached:* +get
OK

> AUTH alice p1pp0
OK
> GET foo
(error) NOPERM this user has no permissions to access one of the keys used as arguments
> GET cached:1234
(nil)
> SET cached:1234 zap
(error) NOPERM this user has no permissions to run the 'set' command or its subcommand

用户结构体

下面是 Redis 源码中针对用户结构体的定义,权限相关则主要关注 allowed_commands 和 allowed_subcommands

// server.h

typedef struct {
    sds name;       /* 用户名 */
    uint64_t flags; /* 特殊状态 */

    uint64_t allowed_commands[USER_COMMAND_BITS_COUNT/64]; /* 位图 */
    sds **allowed_subcommands; /* 可执行子命令 */

    list *passwords; /* 密码 */
    list *patterns;  /* 允许的 key pattern */
    list *channels;  /* 允许的 Pub/Sub channel patterns */
} user;

1. allowed_commands

可用命令位图,index对应各个命令的ID(旧版本通过名字查找,id 为新增特性)。如果为1,命令及所有子命令都可用;如果为0,会去检查 allowed_subcommands,判断子命令是否可用

unsigned long long 最少是64位,除64表明数组中的每一个元素其实是一个大小为sizeof(uint64_t)的小位图

2. allowed_subcommands

二维数组,allowed_subcommands[i][j],i 为 command_id,allowed_subcommands[i] 为这一 command 下可执行的子命令名称数组

位图使用

位图会用于权限添加及权限判断,主要通过位运算来实现

// acl.c

// 判断用户是否有某一(id)命令的权限
// 与操作:取出标志位
int ACLGetUserCommandBit(user *u, unsigned long id) {
    // word: command 在 allowed_commands 数组中的下标
    // bit: 对1进行移位后的操作,定位command位置
    uint64_t word, bit;
    if (ACLGetCommandBitCoordinates(id,&word,&bit) == C_ERR) return 0;
    // 与操作,判断权限
    return (u->allowed_commands[word] & bit) != 0;
}

// 给用户添加某一(id)命令的权限
// 或操作:添加标志位
void ACLSetUserCommandBit(user *u, unsigned long id, int value) {
    uint64_t word, bit;
    if (ACLGetCommandBitCoordinates(id,&word,&bit) == C_ERR) return;
    if (value) {
        u->allowed_commands[word] |= bit;
    } else {
        u->allowed_commands[word] &= ~bit;
        u->flags &= ~USER_FLAG_ALLCOMMANDS;
    }
}

// 作用:找到command_id在位图中的位置,并生成位掩码mask bit
int ACLGetCommandBitCoordinates(uint64_t id, uint64_t *word, uint64_t *bit) {
    if (id >= USER_COMMAND_BITS_COUNT) return C_ERR;
    *word = id / sizeof(uint64_t) / 8;
    *bit = 1ULL << (id % (sizeof(uint64_t) * 8));
    return C_OK;
}

权限判断

在命令真正执行前,调用 CLCheckAllPerm() 函数,检查所有权限,包括命令权限和发布/订阅的频道权限。如果权限判定不通过,记录日志并返回给客户端对应信息。

// server.c

int processCommand(client *c) {
    ....

    /* Check if the user can run this command according to the current
     * ACLs. */
    int acl_errpos;
    int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
    if (acl_retval != ACL_OK) {
        // 记录日志
        addACLLogEntry(c,acl_retval,acl_errpos,NULL);
        // 根据acl判定的类型,返回给客户端消息
        switch (acl_retval) {
        case ACL_DENIED_CMD:  // 命令无权限
            ....
        case ACL_DENIED_KEY:  // key 无权限
            ....
        case ACL_DENIED_CHANNEL:  // 通道无权限
            ....
        default:
            rejectCommandFormat(c, "no permission");
            break;
        }
        return C_OK;
    }

    ....
}

/* Check whether the command is ready to be exceuted by ACLCheckCommandPerm.
 * If check passes, then check whether pub/sub channels of the command is
 * ready to be executed by ACLCheckPubsubPerm */
int ACLCheckAllPerm(client *c, int *idxptr) {
    int acl_retval = ACLCheckCommandPerm(c,idxptr);
    if (acl_retval != ACL_OK)
        return acl_retval;
    if (c->cmd->proc == publishCommand)
        acl_retval = ACLCheckPubsubPerm(c,1,1,0,idxptr);
    else if (c->cmd->proc == subscribeCommand)
        acl_retval = ACLCheckPubsubPerm(c,1,c->argc-1,0,idxptr);
    else if (c->cmd->proc == psubscribeCommand)
        acl_retval = ACLCheckPubsubPerm(c,1,c->argc-1,1,idxptr);
    return acl_retval;
}

命令权限(CommandPerm)

// acl.c

int ACLCheckCommandPerm(client *c, int *keyidxptr) {
    user *u = c->user;  // 获取用户
    uint64_t id = c->cmd->id;  // 获取到命令 id

    /* If there is no associated user, the connection can run anything. */
    if (u == NULL) return ACL_OK;

    // 位运算:判断是否用户有全部权限 or 此命令不需要权限
    if (!(u->flags & USER_FLAG_ALLCOMMANDS) && !(c->cmd->flags & CMD_NO_AUTH))
    {
        // 如果标志位为0,会再去判断子命令的权限
        if (ACLGetUserCommandBit(u,id) == 0) {
            /* Check if the subcommand matches. */
            if (c->argc < 2 ||
                u->allowed_subcommands == NULL ||
                u->allowed_subcommands[id] == NULL)
            {
                return ACL_DENIED_CMD;
            }

            // 遍历命令下的子命令
            long subid = 0;
            while (1) {
                if (u->allowed_subcommands[id][subid] == NULL)
                    return ACL_DENIED_CMD;
                if (!strcasecmp(c->argv[1]->ptr,
                                u->allowed_subcommands[id][subid]))
                    break; /* Subcommand match found. Stop here. */
                subid++;
            }
        }
    }

    // key 权限判断
    if (!(c->user->flags & USER_FLAG_ALLKEYS) &&
        (c->cmd->getkeys_proc || c->cmd->firstkey))
    {
        getKeysResult result = GETKEYS_RESULT_INIT;
        // 获取命令中的 key
        int numkeys = getKeysFromCommand(c->cmd,c->argv,c->argc,&result);
        int *keyidx = result.keys;
        // 遍历 key
        for (int j = 0; j < numkeys; j++) {
            listIter li;
            listNode *ln;
            // 将 li 置于 patterns 链表的头部
            listRewind(u->patterns,&li);

            // 遍历 patterns 链表
            int match = 0;
            while((ln = listNext(&li))) {
                sds pattern = listNodeValue(ln);
                size_t plen = sdslen(pattern);
                int idx = keyidx[j];
                // pattern 匹配
                if (stringmatchlen(pattern,plen,c->argv[idx]->ptr,
                                   sdslen(c->argv[idx]->ptr),0))
                {
                    match = 1;
                    break;
                }
            }
            // 匹配失败
            if (!match) {
                if (keyidxptr) *keyidxptr = keyidx[j];
                getKeysFreeResult(&result);
                return ACL_DENIED_KEY;
            }
        }
        getKeysFreeResult(&result);
    }

    /* If we survived all the above checks, the user can execute the
     * command. */
    return ACL_OK;
}

发表评论

邮箱地址不会被公开。 必填项已用*标注