ACL support, you can define users that can run only certain commands and/or can only access only certain keys patterns.
目前的 Redis(5及以下版本),没有用户权限管理这个概念,只有一个 AUTH 密码验证功能,基本上能够接入的用户就是 root 用户。这也就意味着所有的客户端相当于是使用同一个账户来操作 redis 的。
ACL 就是为了避免接入的用户进行危险命令的操作开发的功能,这类命令如 FLUSHALL、DEBUG 等。在以往这一功能通过命令重命名(RENAME)来完成。
ACL 提供三个层面的权限控制
- 接入权限:用户名和密码
- 可以执行的命令
- 可以操作的 key
操作命令
1. 获取 ACL 列表:
> ACL LIST 1) "user alice off -@all" 2) "user default on nopass ~* +@all"
参数 | 详细说明 |
user | 用户 |
default | 表示默认用户名,或则自己定义的用户名 |
on | 表示是否启用该用户,默认为off(禁用) |
+@ | 表示用户的权限,“+”表示授权权限,有权限操作或访问,“-”表示还是没有权限; @为权限分类,可以通过 ACL CAT 查询支持的分类。+@all 表示所有权限,nocommands 表示不给与任何命令的操作权限 |
~* | 表示可以访问的Key(正则匹配) |
… | 表示用户密码,nopass表示不需要密码 |
2. 修改 ACL 信息:
> ACL SETUSER alice on >p1pp0 ~cached:* +get OK > AUTH alice p1pp0 OK > GET foo (error) NOPERM this user has no permissions to access one of the keys used as arguments > GET cached:1234 (nil) > SET cached:1234 zap (error) NOPERM this user has no permissions to run the 'set' command or its subcommand
用户结构体
下面是 Redis 源码中针对用户结构体的定义,权限相关则主要关注 allowed_commands 和 allowed_subcommands
// server.h typedef struct { sds name; /* 用户名 */ uint64_t flags; /* 特殊状态 */ uint64_t allowed_commands[USER_COMMAND_BITS_COUNT/64]; /* 位图 */ sds **allowed_subcommands; /* 可执行子命令 */ list *passwords; /* 密码 */ list *patterns; /* 允许的 key pattern */ list *channels; /* 允许的 Pub/Sub channel patterns */ } user;
1. allowed_commands
可用命令位图,index对应各个命令的ID(旧版本通过名字查找,id 为新增特性)。如果为1,命令及所有子命令都可用;如果为0,会去检查 allowed_subcommands,判断子命令是否可用
unsigned long long 最少是64位,除64表明数组中的每一个元素其实是一个大小为sizeof(uint64_t)的小位图
2. allowed_subcommands
二维数组,allowed_subcommands[i][j],i 为 command_id,allowed_subcommands[i] 为这一 command 下可执行的子命令名称数组
位图使用
位图会用于权限添加及权限判断,主要通过位运算来实现
// acl.c // 判断用户是否有某一(id)命令的权限 // 与操作:取出标志位 int ACLGetUserCommandBit(user *u, unsigned long id) { // word: command 在 allowed_commands 数组中的下标 // bit: 对1进行移位后的操作,定位command位置 uint64_t word, bit; if (ACLGetCommandBitCoordinates(id,&word,&bit) == C_ERR) return 0; // 与操作,判断权限 return (u->allowed_commands[word] & bit) != 0; } // 给用户添加某一(id)命令的权限 // 或操作:添加标志位 void ACLSetUserCommandBit(user *u, unsigned long id, int value) { uint64_t word, bit; if (ACLGetCommandBitCoordinates(id,&word,&bit) == C_ERR) return; if (value) { u->allowed_commands[word] |= bit; } else { u->allowed_commands[word] &= ~bit; u->flags &= ~USER_FLAG_ALLCOMMANDS; } } // 作用:找到command_id在位图中的位置,并生成位掩码mask bit int ACLGetCommandBitCoordinates(uint64_t id, uint64_t *word, uint64_t *bit) { if (id >= USER_COMMAND_BITS_COUNT) return C_ERR; *word = id / sizeof(uint64_t) / 8; *bit = 1ULL << (id % (sizeof(uint64_t) * 8)); return C_OK; }
权限判断
在命令真正执行前,调用 CLCheckAllPerm() 函数,检查所有权限,包括命令权限和发布/订阅的频道权限。如果权限判定不通过,记录日志并返回给客户端对应信息。
// server.c int processCommand(client *c) { .... /* Check if the user can run this command according to the current * ACLs. */ int acl_errpos; int acl_retval = ACLCheckAllPerm(c,&acl_errpos); if (acl_retval != ACL_OK) { // 记录日志 addACLLogEntry(c,acl_retval,acl_errpos,NULL); // 根据acl判定的类型,返回给客户端消息 switch (acl_retval) { case ACL_DENIED_CMD: // 命令无权限 .... case ACL_DENIED_KEY: // key 无权限 .... case ACL_DENIED_CHANNEL: // 通道无权限 .... default: rejectCommandFormat(c, "no permission"); break; } return C_OK; } .... } /* Check whether the command is ready to be exceuted by ACLCheckCommandPerm. * If check passes, then check whether pub/sub channels of the command is * ready to be executed by ACLCheckPubsubPerm */ int ACLCheckAllPerm(client *c, int *idxptr) { int acl_retval = ACLCheckCommandPerm(c,idxptr); if (acl_retval != ACL_OK) return acl_retval; if (c->cmd->proc == publishCommand) acl_retval = ACLCheckPubsubPerm(c,1,1,0,idxptr); else if (c->cmd->proc == subscribeCommand) acl_retval = ACLCheckPubsubPerm(c,1,c->argc-1,0,idxptr); else if (c->cmd->proc == psubscribeCommand) acl_retval = ACLCheckPubsubPerm(c,1,c->argc-1,1,idxptr); return acl_retval; }
命令权限(CommandPerm)
- 命令本身的权限
- 命令所操作的 key 的权限
// acl.c int ACLCheckCommandPerm(client *c, int *keyidxptr) { user *u = c->user; // 获取用户 uint64_t id = c->cmd->id; // 获取到命令 id /* If there is no associated user, the connection can run anything. */ if (u == NULL) return ACL_OK; // 位运算:判断是否用户有全部权限 or 此命令不需要权限 if (!(u->flags & USER_FLAG_ALLCOMMANDS) && !(c->cmd->flags & CMD_NO_AUTH)) { // 如果标志位为0,会再去判断子命令的权限 if (ACLGetUserCommandBit(u,id) == 0) { /* Check if the subcommand matches. */ if (c->argc < 2 || u->allowed_subcommands == NULL || u->allowed_subcommands[id] == NULL) { return ACL_DENIED_CMD; } // 遍历命令下的子命令 long subid = 0; while (1) { if (u->allowed_subcommands[id][subid] == NULL) return ACL_DENIED_CMD; if (!strcasecmp(c->argv[1]->ptr, u->allowed_subcommands[id][subid])) break; /* Subcommand match found. Stop here. */ subid++; } } } // key 权限判断 if (!(c->user->flags & USER_FLAG_ALLKEYS) && (c->cmd->getkeys_proc || c->cmd->firstkey)) { getKeysResult result = GETKEYS_RESULT_INIT; // 获取命令中的 key int numkeys = getKeysFromCommand(c->cmd,c->argv,c->argc,&result); int *keyidx = result.keys; // 遍历 key for (int j = 0; j < numkeys; j++) { listIter li; listNode *ln; // 将 li 置于 patterns 链表的头部 listRewind(u->patterns,&li); // 遍历 patterns 链表 int match = 0; while((ln = listNext(&li))) { sds pattern = listNodeValue(ln); size_t plen = sdslen(pattern); int idx = keyidx[j]; // pattern 匹配 if (stringmatchlen(pattern,plen,c->argv[idx]->ptr, sdslen(c->argv[idx]->ptr),0)) { match = 1; break; } } // 匹配失败 if (!match) { if (keyidxptr) *keyidxptr = keyidx[j]; getKeysFreeResult(&result); return ACL_DENIED_KEY; } } getKeysFreeResult(&result); } /* If we survived all the above checks, the user can execute the * command. */ return ACL_OK; }