如何使用 ACL
客户端(项目组)要使用ACL功能,首先需要开启Broker服务器端的ACL功能。本章将分别从Broker端、客户端介绍如何使用ACL功能。
Broker端开启ACL
第一步:在 Broker 端的配置文件中添加如下参数,表示开启 ACL 功能。
aclEnable=true
第二步:将 RocketMQ 中的 distribution/conf/plain_acl.yml 文件复制到 ${ROCKETMQ_HOME}/conf
目录下,其配置示例如代码清单 6-1 所示。
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- TopicTest=PUB
groupPerms:
- oms_consumer_group=DENY
- accessKey: admin
secretKey: 12345678
whiteRemoteAddress:
admin: true
我们先详细介绍一下 plain_acl.yml 文件中各个配置项的含义。
globalWhiteRemoteAddresses
全局白名单,其类型为数组,支持多种配置组合,配置规则如下。
-
空:表示不设置白名单,该条规则默认返回false。
-
"*":表示全部匹配,该条规则直接返回 true,将阻断其他规则的判断,请慎重使用。
-
192.168.0.{100,101}:多地址配置模式,IP地址的最后一组使用{},大括号中多个IP地址用英文逗号隔开。
-
192.168.1.100,192.168.2.100:直接使用英文逗号分隔多个IP地址。
-
192.168.*.或 192.168.100-200.10-20:每个 IP 段使用 “*” 或 “-” 表示范围。
accounts
配置账户信息,类型为数组。拥有 accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms 等子元素。
-
accessKey:登录用户名,长度必须大于 6 个字符。
-
secretKey:登录密码。长度必须大于 6 个字符。
-
whiteRemoteAddress:账户级别的 IP 地址白名单,类型为一个字符串,配置规则与 globalWhiteRemoteAddresses 相同,但只能配置一条规则。
-
admin:boolean 类型,设置是否是超级管理员。如下权限只有 admin 为 true 时才有权限执行。
-
UPDATE_AND_CREATE_TOPIC:更新或创建主题。
-
UPDATE_BROKER_CONFIG:更新Broker配置。
-
DELETE_TOPIC_IN_BROKER:删除主题。
-
UPDATE_AND_CREATE_SUBSCRIPTIONGROUP:更新或创建订阅组信息。
-
DELETE_SUBSCRIPTIONGROUP:删除订阅组信息。
-
-
defaultTopicPerm:默认topic权限,该值默认为DENY(拒绝)。
-
defaultGroupPerm:默认消费组权限,该值默认为DENY(拒绝),建议值为SUB。
-
topicPerms:设置topic的权限,其类型为数组。
-
groupPerms:设置消费组的权限,其类型为数组,可以为每一消费组配置不一样的权限。
RocketMQ ACL 中的可选权限编码如下。
-
DENY:拒绝。
-
PUB:拥有发送权限。
-
SUB:拥有订阅权限。
经过上面两个步骤,RocketMQ Broker 端就启用了 ACL 功能。需要特别注意的是,因为要修改 Broker 端的配置文件,所以需要重启 Broker 服务器才能生效。那么如果修改 plain_acl.yml 中的配置,是否需要重启 Broker 服务器呢? |
客户端使用ACL
消息发送端示例代码
消息发送端的示例代码如代码清单 6-2 所示。
public class AclProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name", getAclRPCHook()); //@/
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++) {
try {
Message msg = new Message("TopicTest3", "TagA", ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e){
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("rocketmq", "12345678")); // @2
}
}
上述示例代码有两个关键点。
-
代码 @1:在创建 DefaultMQProducer 对象时传入一个 AclClientRPCHook 钩子函数。
-
代码 @2:在构建 AclClientRPCHook 时传入对应的用户名与密码。
消息消费端示例代码
消息消费端的示例代码如代码清单 6-3 所示。
public class AclConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pGroup", getAclRPCHook(), new AllocateMessageQueueAveragely());
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("rocketmq", "pwd"));
}
}
消费端的代码与发送端类似,在构建 DefaultMQPushConsumer 时传入一个 RPC 钩子函数,RocketMQ 已经提供了默认实现的 AclClientRPCHook,只需要传入对应的用户名和密码。
关于如何使用 ACL 就介绍到这里了,接下来详细分析 RocketMQ ACL 的实现原理。