如何使用 ACL

客户端(项目组)要使用ACL功能,首先需要开启Broker服务器端的ACL功能。本章将分别从Broker端、客户端介绍如何使用ACL功能。

Broker端开启ACL

第一步:在 Broker 端的配置文件中添加如下参数,表示开启 ACL 功能。

aclEnable=true

第二步:将 RocketMQ 中的 distribution/conf/plain_acl.yml 文件复制到 ${ROCKETMQ_HOME}/conf 目录下,其配置示例如代码清单 6-1 所示。

globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
  secretKey: 12345678
  whiteRemoteAddress:
  admin: false
  defaultTopicPerm: DENY
  defaultGroupPerm: SUB
  topicPerms:
  - TopicTest=PUB
  groupPerms:
  - oms_consumer_group=DENY
- accessKey: admin
  secretKey: 12345678
  whiteRemoteAddress:
  admin: true

我们先详细介绍一下 plain_acl.yml 文件中各个配置项的含义。

globalWhiteRemoteAddresses

全局白名单,其类型为数组,支持多种配置组合,配置规则如下。

  • 空:表示不设置白名单,该条规则默认返回false。

  • "*":表示全部匹配,该条规则直接返回 true,将阻断其他规则的判断,请慎重使用。

  • 192.168.0.{100,101}:多地址配置模式,IP地址的最后一组使用{},大括号中多个IP地址用英文逗号隔开。

  • 192.168.1.100,192.168.2.100:直接使用英文逗号分隔多个IP地址。

  • 192.168.*.或 192.168.100-200.10-20:每个 IP 段使用 “*” 或 “-” 表示范围。

accounts

配置账户信息,类型为数组。拥有 accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms 等子元素。

  1. accessKey:登录用户名,长度必须大于 6 个字符。

  2. secretKey:登录密码。长度必须大于 6 个字符。

  3. whiteRemoteAddress:账户级别的 IP 地址白名单,类型为一个字符串,配置规则与 globalWhiteRemoteAddresses 相同,但只能配置一条规则。

  4. admin:boolean 类型,设置是否是超级管理员。如下权限只有 admin 为 true 时才有权限执行。

    • UPDATE_AND_CREATE_TOPIC:更新或创建主题。

    • UPDATE_BROKER_CONFIG:更新Broker配置。

    • DELETE_TOPIC_IN_BROKER:删除主题。

    • UPDATE_AND_CREATE_SUBSCRIPTIONGROUP:更新或创建订阅组信息。

    • DELETE_SUBSCRIPTIONGROUP:删除订阅组信息。

  5. defaultTopicPerm:默认topic权限,该值默认为DENY(拒绝)。

  6. defaultGroupPerm:默认消费组权限,该值默认为DENY(拒绝),建议值为SUB。

  7. topicPerms:设置topic的权限,其类型为数组。

  8. groupPerms:设置消费组的权限,其类型为数组,可以为每一消费组配置不一样的权限。

RocketMQ ACL 中的可选权限编码如下。

  • DENY:拒绝。

  • PUB:拥有发送权限。

  • SUB:拥有订阅权限。

经过上面两个步骤,RocketMQ Broker 端就启用了 ACL 功能。需要特别注意的是,因为要修改 Broker 端的配置文件,所以需要重启 Broker 服务器才能生效。那么如果修改 plain_acl.yml 中的配置,是否需要重启 Broker 服务器呢?

客户端使用ACL

消息发送端示例代码

消息发送端的示例代码如代码清单 6-2 所示。

代码清单6-2 AclProducer示例代码
public class AclProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name", getAclRPCHook()); //@/
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 1; i++) {
            try {
                Message msg = new Message("TopicTest3", "TagA", ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e){
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("rocketmq", "12345678")); // @2
    }
}

上述示例代码有两个关键点。

  • 代码 @1:在创建 DefaultMQProducer 对象时传入一个 AclClientRPCHook 钩子函数。

  • 代码 @2:在构建 AclClientRPCHook 时传入对应的用户名与密码。

消息消费端示例代码

消息消费端的示例代码如代码清单 6-3 所示。

代码清单6-3 AclConsumer示例代码
public class AclConsumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pGroup", getAclRPCHook(), new AllocateMessageQueueAveragely());
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("rocketmq", "pwd"));
    }
}

消费端的代码与发送端类似,在构建 DefaultMQPushConsumer 时传入一个 RPC 钩子函数,RocketMQ 已经提供了默认实现的 AclClientRPCHook,只需要传入对应的用户名和密码。

关于如何使用 ACL 就介绍到这里了,接下来详细分析 RocketMQ ACL 的实现原理。