RocketMQ ACL 权限控制
基于 RocketMQ 4.8.0 版本进行的源码分析。
RocketMQ 从 4.4.0 版本引入了 ACL 权限控制功能。可以给话题指定权限,只有拥有权限的消费者才可以进行消费。其余 ACL 特性请查看权限控制。
如何使用
首先定义一个 RPCHook
:
private static final String ACL_ACCESS_KEY = "RocketMQ";
private static final String ACL_SECRET_KEY = "1234567";
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}
然后发送消息的时候指定 RPCHook
:
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
接受消息的时候也需要指定具有同样 ACCESS_KEY
和 SECRET_KEY
的 RPCHook
:
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_6", getAclRPCHook());
Producer 指定 RPCHook
从示例代码中,我们可以看出可以为 Producer 指定一个 RPCHook,随后此 RPCHook
会被注册进来:
// org.apache.rocketmq.client.impl.MQClientAPIImpl.class
this.remotingClient.registerRPCHook(rpcHook);
注册的实质就是将其放入了 rpcHooks
列表中:
// org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.java
protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
在 Producer 端调用底层 API 发送命令的前后,调用 RPCHook
上面的 doBeforeRequest
和 doAfterRequest
方法,便于在发送命令的前后拦截:
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) {
doBeforeRpcHooks(addr, request);
// ...
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
protected void doBeforeRpcHooks(String addr, RemotingCommand request) {
if (rpcHooks.size() > 0) {
for (RPCHook rpcHook: rpcHooks) {
rpcHook.doBeforeRequest(addr, request);
}
}
}
protected void doAfterRpcHooks(String addr, RemotingCommand request, RemotingCommand response) {
if (rpcHooks.size() > 0) {
for (RPCHook rpcHook: rpcHooks) {
rpcHook.doAfterResponse(addr, request, response);
}
}
}
AclClientRPCHook
下面来看 AclClientRPCHook
在发送命令的前后做了什么事情:
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
byte[] total = AclUtils.combineRequestContent(request,
parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken()));
String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
request.addExtField(SIGNATURE, signature);
request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());
// The SecurityToken value is unneccessary,user can choose this one.
if (sessionCredentials.getSecurityToken() != null) {
request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
}
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
// 空实现
}
生成签名
parseRequestContent
方法内部将 request
的自定义头部上面的所有字段的 name
和 value
放入到了一个 SortedMap
中,同时将 ACCESS_KEY
和 SECURITY_TOKEN
(如果有)也放入了进去:
map.put(ACCESS_KEY, ak);
if (securityToken != null) {
map.put(SECURITY_TOKEN, securityToken);
}
当然获取类上面的所有字段是通过反射实现的,为了提高性能,也是使用了 Map
进行了缓存。缓存的只是 Field
字段,而非 value
。
protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache =
new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>();
之所以获取到所有字段的值,是为了计算签名。计算签名的方法如下:
- 首先将上述所有字段的值拼接城字符串,然后获取字节数组,再与请求本身的
body
的字节数组拼接在一起,获取到最终的byte[]
数组。
// AclUtils.java
public static byte[] combineRequestContent(RemotingCommand request, SortedMap<String, String> fieldsMap) {
StringBuilder sb = new StringBuilder("");
for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {
if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
sb.append(entry.getValue());
}
}
return AclUtils.combineBytes(sb.toString().getBytes(CHARSET), request.getBody());
}
- 然后通过
calSignature
方法计算签名,在内部默认采用SigningAlgorithm.HmacSHA1
算法获取到签名后的byte[]
数组,再通过Base64.encodeBase64
将其转为字符串,返回最终的签名。
// AclClientRPCHook.java
String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
添加扩展字段
生成签名以后,将签名、ACCESS_KEY
、SECURITY_TOKEN
(如果有) 添加到请求的扩展字段中。
request.addExtField(SIGNATURE, signature);
request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());
// The SecurityToken value is unneccessary,user can choose this one.
if (sessionCredentials.getSecurityToken() != null) {
request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
}
Broker 权限验证
初始化 AccessValidator
Broker 在初始化 ACL 的时候会判断用户是否启用了 ACL:
// BrokerController.java
if (!this.brokerConfig.isAclEnable()) {
return;
}
如果用户开启了 ACL,那么会从 META-INF
路径下去加载所有实现了 AccessValidator
接口的实现类:
// ServiceProvider.java
public static final String ACL_VALIDATOR_ID = "META-INF/service/org.apache.rocketmq.acl.AccessValidator";
// BrokerController.java
List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
if (accessValidators == null || accessValidators.isEmpty()) {
return;
}
如果有 AccessValidator
的实现,那么会注册到 Server 端的 rpcHooks
列表中:
for (AccessValidator accessValidator: accessValidators) {
final AccessValidator validator = accessValidator;
accessValidatorMap.put(validator.getClass(),validator);
this.registerServerRPCHook(new RPCHook() {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
//Do not catch the exception
validator.validate(validator.parse(request, remoteAddr));
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
}
});
}
而 Broker 中的 META-INF/service/org.apache.rocketmq.acl.AccessValidator
文件存储的内容如下,即采用 PlainAccessValidator
作为默认的权限访问校验器。
org.apache.rocketmq.acl.plain.PlainAccessValidator
接下来就介绍 PlainAccessValidator
内部是如何进行权限校验的。
生成 AccessResource
Broker 端收到请求后,会将请求解析为 AccessResource
。解析的过程就是将 RemotingCommand
中附带的 IP 地址、ACCESS_KEY
、签名、SECRET_TOKEN
等添加到 PlainAccessResource
中。并根据不同的命令,给资源添加上不同的访问权限。
@Override
public AccessResource parse(RemotingCommand request, String remoteAddr) {
PlainAccessResource accessResource = new PlainAccessResource();
// 将远程的地址放到白名单里面
accessResource.setWhiteRemoteAddress(remoteAddr.substring(0, remoteAddr.lastIndexOf(':')));
// 将 ACCESS_KEY、SIGNATURE、SECRET_TOKEN 解析出来
accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));
accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE));
accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));
// 根据不同的请求,添加不同的资源访问权限
switch (request.getCode()) {
case RequestCode.SEND_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);
break;
case RequestCode.QUERY_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
break;
// ...
}
}
其中 RocketMQ 的 Topic 资源访问权限控制定义主要如下所示,分为以下四种:
public class Permission {
public static final byte DENY = 1;
public static final byte ANY = 1 << 1;
public static final byte PUB = 1 << 2;
public static final byte SUB = 1 << 3;
}
DENY
: 拒绝ANY
:PUB
或者SUB
权限PUB
: 发送权限。即从 Producer 端发送出来的命令,所具有的权限。SUB
: 订阅权限。即从消费端发送出来的命令,所具有的权限。
比如上述示例代码,SEND_MESSAGE
命令只能是 Producer 端发送,因此它的权限 PUB
;而 QUERY_MESSAGE
命令只能是 Consumer 端查询,因此它的权限是 SUB
。
校验权限
生成 AccessResource 后,便需要对这个资源进行权限校验,校验的具体规则如下:
- (1)检查是否命中全局 IP 白名单;如果是,则认为校验通过;否则走 2;
// PlainPermissionManager.java
for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {
if (remoteAddressStrategy.match(plainAccessResource)) {
return;
}
}
- (2)检查是否命中用户 IP 白名单;如果是,则认为校验通过;否则走 3;
// PlainPermissionManager.java
PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {
return;
}
- (3)校验签名,校验不通过,抛出异常;校验通过,则走 4;
// PlainPermissionManager.java
String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());
if (!signature.equals(plainAccessResource.getSignature())) {
throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey()));
}
- (4)对用户请求所需的权限 和 用户所拥有的权限进行校验;不通过,抛出异常;
// 如果是 Admin 那么直接通过
if (ownedPermMap == null && ownedAccess.isAdmin()) {
// If the ownedPermMap is null and it is an admin user, then return
return;
}
// Permission.java
public static boolean checkPermission(byte neededPerm, byte ownedPerm) {
if ((ownedPerm & DENY) > 0) {
return false;
}
if ((neededPerm & ANY) > 0) {
return ((ownedPerm & PUB) > 0) || ((ownedPerm & SUB) > 0);
}
return (neededPerm & ownedPerm) > 0;
}
用户所需权限的校验需要注意已下内容:
- (1)特殊的请求例如
UPDATE_AND_CREATE_TOPIC
等,只能由 admin 账户进行操作;
// PlainPermissionManager.java
if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {
throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));
}
- (2)对于某个资源,如果有显性配置权限,则采用配置的权限;如果没有显性配置权限,则采用默认的权限;
ACL 权限管理器
上述校验规则的 validate
方法是放在权限管理器 PlainPermissionManager
上面的,在新建该类实例的时候,其内部会首先加载 YML 格式的权限配置文件,然后在监听这个文件的变化,做到运行时动态的更新权限。
private static final String DEFAULT_PLAIN_ACL_FILE = "/conf/plain_acl.yml";
private String fileName = System.getProperty("rocketmq.acl.plain.file", DEFAULT_PLAIN_ACL_FILE);
public void load() {
JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName, JSONObject.class);
// ...
}
RocketMQ 在 distribution/conf
目录下,给出了一个默认的权限配置文件 plain_acl.yml
:
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress: 192.168.0.*
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=SUB
- groupC=SUB
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
# if it is admin, it could access all resources
admin: true
对于此文件的监听,是通过 FileWatchService
进行的:
FileWatchService fileWatchService = new FileWatchService(new String[] {watchFilePath}, new FileWatchService.Listener() {
@Override
public void onChanged(String path) {
log.info("The plain acl yml changed, reload the context");
load();
}
});
fileWatchService.start();
在 FileWatchService
内部,其每隔 WATCH_INTERVAL = 500
毫秒,扫描一次指定的所有文件列表。如果某个文件的 MD5
哈希值有变化,就会调用 listener.onChanged
方法来通知这个文件发生了变化。