RocketMQ ACL 权限控制

RocketMQ ACL 权限控制

基于 RocketMQ 4.8.0 版本进行的源码分析。

RocketMQ 从 4.4.0 版本引入了 ACL 权限控制功能。可以给话题指定权限,只有拥有权限的消费者才可以进行消费。其余 ACL 特性请查看权限控制

如何使用

首先定义一个 RPCHook

private static final String ACL_ACCESS_KEY = "RocketMQ";
private static final String ACL_SECRET_KEY = "1234567";

static RPCHook getAclRPCHook() {
    return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}

然后发送消息的时候指定 RPCHook

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());

接受消息的时候也需要指定具有同样 ACCESS_KEYSECRET_KEYRPCHook

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_6", getAclRPCHook());

Producer 指定 RPCHook

从示例代码中,我们可以看出可以为 Producer 指定一个 RPCHook,随后此 RPCHook 会被注册进来:

// org.apache.rocketmq.client.impl.MQClientAPIImpl.class
this.remotingClient.registerRPCHook(rpcHook);

注册的实质就是将其放入了 rpcHooks 列表中:

// org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.java
protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();

在 Producer 端调用底层 API 发送命令的前后,调用 RPCHook 上面的 doBeforeRequestdoAfterRequest 方法,便于在发送命令的前后拦截

@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) {
    doBeforeRpcHooks(addr, request);
    // ...
    doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);   
}

protected void doBeforeRpcHooks(String addr, RemotingCommand request) {
    if (rpcHooks.size() > 0) {
        for (RPCHook rpcHook: rpcHooks) {
            rpcHook.doBeforeRequest(addr, request);
        }
    }
}

protected void doAfterRpcHooks(String addr, RemotingCommand request, RemotingCommand response) {
    if (rpcHooks.size() > 0) {
        for (RPCHook rpcHook: rpcHooks) {
            rpcHook.doAfterResponse(addr, request, response);
        }
    }
}

AclClientRPCHook

下面来看 AclClientRPCHook 在发送命令的前后做了什么事情:

@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
    byte[] total = AclUtils.combineRequestContent(request,
        parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken()));
    String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
    request.addExtField(SIGNATURE, signature);
    request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());
    
    // The SecurityToken value is unneccessary,user can choose this one.
    if (sessionCredentials.getSecurityToken() != null) {
        request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
    }
}

@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
    // 空实现
}

生成签名

parseRequestContent 方法内部将 request自定义头部上面的所有字段的 namevalue 放入到了一个 SortedMap 中,同时将 ACCESS_KEYSECURITY_TOKEN (如果有)也放入了进去:

map.put(ACCESS_KEY, ak);
if (securityToken != null) {
    map.put(SECURITY_TOKEN, securityToken);
}

当然获取类上面的所有字段是通过反射实现的,为了提高性能,也是使用了 Map 进行了缓存。缓存的只是 Field 字段,而非 value

protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache =
        new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>();

之所以获取到所有字段的值,是为了计算签名。计算签名的方法如下:

  • 首先将上述所有字段的值拼接城字符串,然后获取字节数组,再与请求本身的 body 的字节数组拼接在一起,获取到最终的 byte[] 数组。
// AclUtils.java
public static byte[] combineRequestContent(RemotingCommand request, SortedMap<String, String> fieldsMap) {
    StringBuilder sb = new StringBuilder("");
    for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {
        if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
            sb.append(entry.getValue());
        }
    }

    return AclUtils.combineBytes(sb.toString().getBytes(CHARSET), request.getBody());
}
  • 然后通过 calSignature 方法计算签名,在内部默认采用 SigningAlgorithm.HmacSHA1 算法获取到签名后的 byte[] 数组,再通过 Base64.encodeBase64 将其转为字符串,返回最终的签名。
// AclClientRPCHook.java
String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());

添加扩展字段

生成签名以后,将签名、ACCESS_KEYSECURITY_TOKEN (如果有) 添加到请求的扩展字段中。

request.addExtField(SIGNATURE, signature);
request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());

// The SecurityToken value is unneccessary,user can choose this one.
if (sessionCredentials.getSecurityToken() != null) {
    request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
}

Broker 权限验证

初始化 AccessValidator

Broker 在初始化 ACL 的时候会判断用户是否启用了 ACL:

// BrokerController.java
if (!this.brokerConfig.isAclEnable()) {
    return;
}

如果用户开启了 ACL,那么会从 META-INF 路径下去加载所有实现了 AccessValidator 接口的实现类:

// ServiceProvider.java
public static final String ACL_VALIDATOR_ID = "META-INF/service/org.apache.rocketmq.acl.AccessValidator";

// BrokerController.java
List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
if (accessValidators == null || accessValidators.isEmpty()) {
    return;
}

如果有 AccessValidator 的实现,那么会注册到 Server 端的 rpcHooks 列表中:

for (AccessValidator accessValidator: accessValidators) {
    final AccessValidator validator = accessValidator;
    accessValidatorMap.put(validator.getClass(),validator);
    this.registerServerRPCHook(new RPCHook() {

        @Override
        public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
            //Do not catch the exception
            validator.validate(validator.parse(request, remoteAddr));
        }

        @Override
        public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
        }
    });
}

而 Broker 中的 META-INF/service/org.apache.rocketmq.acl.AccessValidator 文件存储的内容如下,即采用 PlainAccessValidator 作为默认的权限访问校验器。

org.apache.rocketmq.acl.plain.PlainAccessValidator

接下来就介绍 PlainAccessValidator 内部是如何进行权限校验的。

生成 AccessResource

Broker 端收到请求后,会将请求解析为 AccessResource。解析的过程就是将 RemotingCommand 中附带的 IP 地址、ACCESS_KEY、签名、SECRET_TOKEN 等添加到 PlainAccessResource 中。并根据不同的命令,给资源添加上不同的访问权限

@Override
public AccessResource parse(RemotingCommand request, String remoteAddr) {
    PlainAccessResource accessResource = new PlainAccessResource();

    // 将远程的地址放到白名单里面
    accessResource.setWhiteRemoteAddress(remoteAddr.substring(0, remoteAddr.lastIndexOf(':')));

    // 将 ACCESS_KEY、SIGNATURE、SECRET_TOKEN 解析出来
    accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));
    accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE));
    accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));

    // 根据不同的请求,添加不同的资源访问权限
    switch (request.getCode()) {
        case RequestCode.SEND_MESSAGE:
            accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);
            break;

        case RequestCode.QUERY_MESSAGE:
            accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
            break;

        // ...
    }
}

其中 RocketMQ 的 Topic 资源访问权限控制定义主要如下所示,分为以下四种:

public class Permission {

    public static final byte DENY = 1;
    public static final byte ANY = 1 << 1;
    public static final byte PUB = 1 << 2;
    public static final byte SUB = 1 << 3;

}
  • DENY: 拒绝
  • ANY: PUB 或者 SUB 权限
  • PUB: 发送权限。即从 Producer 端发送出来的命令,所具有的权限。
  • SUB: 订阅权限。即从消费端发送出来的命令,所具有的权限。

比如上述示例代码,SEND_MESSAGE 命令只能是 Producer 端发送,因此它的权限 PUB;而 QUERY_MESSAGE 命令只能是 Consumer 端查询,因此它的权限是 SUB

校验权限

生成 AccessResource 后,便需要对这个资源进行权限校验,校验的具体规则如下:

  • (1)检查是否命中全局 IP 白名单;如果是,则认为校验通过;否则走 2;
// PlainPermissionManager.java
for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {
    if (remoteAddressStrategy.match(plainAccessResource)) {
        return;
    }
}
  • (2)检查是否命中用户 IP 白名单;如果是,则认为校验通过;否则走 3;
// PlainPermissionManager.java
PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {
    return;
}
  • (3)校验签名,校验不通过,抛出异常;校验通过,则走 4;
// PlainPermissionManager.java
String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());
if (!signature.equals(plainAccessResource.getSignature())) {
    throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey()));
}
  • (4)对用户请求所需的权限 和 用户所拥有的权限进行校验;不通过,抛出异常;
// 如果是 Admin 那么直接通过
if (ownedPermMap == null && ownedAccess.isAdmin()) {
    // If the ownedPermMap is null and it is an admin user, then return
    return;
}

// Permission.java
public static boolean checkPermission(byte neededPerm, byte ownedPerm) {
    if ((ownedPerm & DENY) > 0) {
        return false;
    }
    if ((neededPerm & ANY) > 0) {
        return ((ownedPerm & PUB) > 0) || ((ownedPerm & SUB) > 0);
    }
    return (neededPerm & ownedPerm) > 0;
}

用户所需权限的校验需要注意已下内容:

  • (1)特殊的请求例如 UPDATE_AND_CREATE_TOPIC 等,只能由 admin 账户进行操作;
// PlainPermissionManager.java
if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {
    throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));
}
  • (2)对于某个资源,如果有显性配置权限,则采用配置的权限;如果没有显性配置权限,则采用默认的权限;

ACL 权限管理器

上述校验规则的 validate 方法是放在权限管理器 PlainPermissionManager 上面的,在新建该类实例的时候,其内部会首先加载 YML 格式的权限配置文件,然后在监听这个文件的变化,做到运行时动态的更新权限。

private static final String DEFAULT_PLAIN_ACL_FILE = "/conf/plain_acl.yml";
private String fileName = System.getProperty("rocketmq.acl.plain.file", DEFAULT_PLAIN_ACL_FILE);

public void load() {
    JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName, JSONObject.class);
    // ...
}

RocketMQ 在 distribution/conf 目录下,给出了一个默认的权限配置文件 plain_acl.yml

globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*

accounts:
- accessKey: RocketMQ
  secretKey: 12345678
  whiteRemoteAddress: 192.168.0.*
  admin: false
  defaultTopicPerm: DENY
  defaultGroupPerm: SUB
  topicPerms:
  - topicA=DENY
  - topicB=PUB|SUB
  - topicC=SUB
  groupPerms:
  # the group should convert to retry topic
  - groupA=DENY
  - groupB=SUB
  - groupC=SUB

- accessKey: rocketmq2
  secretKey: 12345678
  whiteRemoteAddress: 192.168.1.*
  # if it is admin, it could access all resources
  admin: true

对于此文件的监听,是通过 FileWatchService 进行的:

FileWatchService fileWatchService = new FileWatchService(new String[] {watchFilePath}, new FileWatchService.Listener() {
    @Override
    public void onChanged(String path) {
        log.info("The plain acl yml changed, reload the context");
        load();
    }
});
fileWatchService.start();

FileWatchService 内部,其每隔 WATCH_INTERVAL = 500 毫秒,扫描一次指定的所有文件列表。如果某个文件的 MD5 哈希值有变化,就会调用 listener.onChanged 方法来通知这个文件发生了变化。