ACL实现原理
开启 ACL 功能需要 Broker 端与客户端同时配合,故本节将从 Broker 端与客户端两个维度深入探讨 ACL 的实现原理。
Broker端ACL核心入口
Broker 端的源码入口为 BrokerController 的 initialAcl() 方法,如代码清单 6-4 所示。
if (!this.brokerConfig.isAclEnable()) {
log.info("The broker dose not enable acl");
return;
}
第一步:如果没有开启 ACL 功能,则返回。如果要启用 ACL,则必须在 Broker 配置文件中将属性 aclEnable 设置为 true,该值默认为 false,如代码清单 6-5 所示。
public static void main(String[] args) {
List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
if (accessValidators == null || accessValidators.isEmpty()){
log.info("The broker dose not load the AccessValidator");
return;
}
}
第二步:使用 SPI 机制加载配置的 AccessValidator 实现类,该方法返回一个列表,具体实现逻辑是读取 METAINF/service/org.apache.rocketmq.acl.AccessValidator 文件中配置的访问验证器,如代码清单 6-6 所示。默认配置内容如图 6-1 所示。

for (AccessValidator accessValidator : accessValidators) {
final AccessValidator validator = accessValidator;
accessValidatorMap.put(validator.getClass(), validator);
this.registerServerRPCHook(new RPCHook() {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) { //不要捕获异常
validator.validate(validator.parse(request, remoteAddr));
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
}
});
}
第三步:遍历配置的访问验证器(AccessValidator)后,向 Broker 处理服务器注册钩子函数,RPCHook 的 doBeforeRequest() 方法将在服务端接收到请求并解码后、执行处理请求前被调用。RPCHook 的 doAfterResponse() 方法会在处理完请求后,将结果返回,其调用关系如图 6-2 所示。

接下来重点介绍 PlainAccessValidator。
PlainAccessValidator详解
我们先来看一下 PlainAccessValidator 的核心类图,如图 6-3 所示。

AccessValidator 是访问验证器接口,主要定义如下接口。
-
AccessResource parse(RemotingCommand request, String remoteAddr):从请求头中解析本次请求对应的访问资源,即本次请求需要的访问权限。
-
void validate(AccessResource accessResource):根据本次需要访问的权限,与请求用户拥有的权限进行对比验证,判断请求用户是否拥有权限。如果请求用户没有访问该操作的权限,则抛出异常,否则放行。
-
boolean updateAccessConfig(PlainAccessConfig plainAccessConfig):更新ACL访问控制列表的配置。
-
boolean deleteAccessConfig(String accesskey):根据账户名称删除访问授权规则。
-
String getAclConfigVersion():获取ACL配置当前的版本号。
-
boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList):更新全局白名单IP列表。
-
AclConfig getAllAclConfig():获取ACL相关的配置信息。
PlainAccessValidator 是 RocketMQ 默认提供基于 yml 配置格式的访问验证器。
接下来我们重点探讨 PlainAccessValidator 的 parse() 方法和 validate() 方法的实现细节。在讲解该方法之前,我们先认识一下 RocketMQ 封装访问资源的 PlainAccessResource,如图 6-4 所示。

下面逐一介绍 PlainAccessResource 的核心属性。
1)String accessKey:访问Key,用户名。 2)String secretKey:用户密钥。 3)String whiteRemoteAddress:远程IP地址白名单。 4)boolean admin:是否是管理员角色。 5)byte defaultTopicPerm=1:默认topic的访问权限,如果没有配置topic的权限,则topic默认的访问权限为1,表示DENY。 6)byte defaultGroupPerm=1:消费组默认访问权限,默认为DENY。 7)Map<String,Byte> resourcePermMap:资源需要的访问权限映射表。 8)RemoteAddressStrategy remoteAddressStrategy:远程IP地址验证策略。 9)int requestCode:当前请求的requestCode。 10)byte[] content:请求头与请求体的内容。 11)String signature:签名字符串,这是通常的套路,在客户端,首先将请求参数排序,然后使用secretKey生成签名字符串,在服务端重复这个步骤,然后对比签名字符串,如果相同,则认为登录成功,否则失败。 12)String secretToken:密钥令牌。 13)String recognition:保留字段,目前未被使用。
接下来介绍 PlainAccessValidator 的构造方法,如代码清单 6-7 所示。
public PlainAccessValidator() {
aclPlugEngine = new PlainPermissionManager();
}
在构造函数中直接创建 PlainPermissionManager 对象,该对象的主要作用是解析 plain_acl.yml 文件,加载配置文件中的访问控制规则,本节稍候会重点介绍。
parse() 方法的作用是从请求命令中解析出本次访问需要的权限,然后构建 AccessResource 对象,为后续的校验权限做准备,如代码清单 6-8 所示。
PlainAccessResource accessResource = new PlainAccessResource();
if (remoteAddr != null && remoteAddr.contains(":")) {
accessResource.setWhiteRemoteAddress(remoteAddr.substring(0, remoteAddr.lastIndexOf(':')));
} else {
accessResource.setWhiteRemoteAddress(remoteAddr);
}
第一步:首先创建 PlainAccessResource 对象,该对象用来表示一次请求需要访问的权限,从远程地址中提取远程访问的 IP 地址,如代码清单 6-9 所示。
accessResource.setRequestCode(request.getCode());
if (request.getExtFields() == null) {
return accessResource;
}
accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));
accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE));
accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));
第二步:如果请求中的扩展字段为空,则直接返回该资源,即后续的访问控制只针对IP地址,否则从请求体中提取客户端的访问用户名、签名字符串、安全令牌,如代码清单6-10所示。
try {
switch (request.getCode()) {
case RequestCode.SEND_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);
break;
case RequestCode.SEND_MESSAGE_V2:
//省略类似代码
break;
case RequestCode.CONSUMER_SEND_MSG_BACK:
accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB);
accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB);
break;
case RequestCode.PULL_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB);
break;
case RequestCode.QUERY_MESSAGE:
//省略类似代码
break;
case RequestCode.HEART_BEAT:
//省略类似代码
break;
case RequestCode.UNREGISTER_CLIENT:
//省略类似代码
break;
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
//省略类似代码
break;
case RequestCode.UPDATE_CONSUMER_OFFSET:
//省略类似代码
break;
default:
break;
}
} catch (Throwable t) {
throw new AclException(t.getMessage(), t);
}
第三步:根据请求命令设置本次请求需要获得的权限,上述代码比较简单,就是从请求中得出本次操作的topic、消息组名称。为了方便区分topic与消费组,消费组将消费者对应的重试主题当作资源的Key,如代码清单 6-11 所示。从这里也可以看出当前版本需要进行 ACL 权限验证的请求命令如下。
1)SEND_MESSAGE:发送消息。 2)SEND_MESSAGE_V2:发送第2版消息。 3)CONSUMER_SEND_MSG_BACK:消费组重试发送消息。 4)PULL_MESSAGE:消息拉取。 5)QUERY_MESSAGE:消息查询。 6)HEART_BEAT:心跳请求。 7)UNREGISTER_CLIENT:取消注册。 8)GET_CONSUMER_LIST_BY_GROUP:根据消费组获取消费者列表。 9)UPDATE_CONSUMER_OFFSET:更新消息消费进度。
// Content字段
SortedMap<String, String> map = new TreeMap<String, String>();
for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) {
if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
map.put(entry.getKey(), entry.getValue());
}
}
accessResource.setContent(AclUtils.combineRequestContent(request, map));
第四步:对扩展字段进行排序,以便于生成签名字符串,然后将扩展字段与请求体(body)写入content字段,完成从请求头中解析出请求需要验证的权限。
validate() 方法如代码清单 6-12 所示。
public void validate(AccessResource accessResource) {
aclPlugEngine.validate((PlainAccessResource) accessResource);
}
验证权限是根据本次请求需要的权限与当前用户所拥有的权限进行对比,如果符合,则正常执行;否则抛出 AclException。为了了解权限的验证流程,我们继续学习 PlainPermissionManager。
PlainPermissionManager详解
PlainPermissionManager 的主要职责是管理 plain_acl.yml 权限配置文件,该文件用于验证访问权限。PlainPermissionManager 类图如图 6-5 所示。

下面逐一介绍 PlainPermissionManager 的核心属性。
1)DEFAULT_PLAIN_ACL_FILE:ACL默认配置文件名称,默认值为conf/plain_acl.yml。 2)String fileHome:配置文件所在的目录,可以通过系统参数Rocketmq.home.dir指定,默认为RocketMQ的主目录。 3)String fileName:ACL配置文件名称,默认为DEFAULT_PLAIN_ACL_FILE,可以通过系统参数Rocketmq.acl.plain.file=fileName指定。 4)Map<String/AccessKey/,PlainAccessResource> plainAccessResourceMap:权图 下面逐一介绍PlainPermissionManager的核心属性。 1)DEFAULT_PLAIN_ACL_FILE:ACL默认配置文件名称,默认值为conf/plain_acl.yml。 2)String fileHome:配置文件所在的目录,可以通过系统参数Rocketmq.home.dir指定,默认为RocketMQ的主目录。 3)String fileName:ACL配置文件名称,默认为DEFAULT_PLAIN_ACL_FILE,可以通过系统参数Rocketmq.acl.plain.file=fileName指定。 4)Map<String/AccessKey/,PlainAccessResource> plainAccessResourceMap:权限配置映射表,用户名为键。 5)RemoteAddressStrategyFactory remoteAddressStrategyFactory:远程IP解析策略工厂,用于解析白名单IP地址。 6)boolean isWatchStart:是否监听plain_acl.yml文件,一旦该文件的内容改变,可以在不重启服务器的情况下自动生效。 7)DataVersion dataVersion = new DataVersion():配置文件版本号。
PlainPermissionManager 构造函数如代码清单 6-13 所示。
public PlainPermissionManager() {
load();
watch();
}
构造函数比较简单,就是调用 load() 方法解析 YAML 格式的文件,并将对应的配置规则加载到内存中。接下来我们分别介绍 load() 与 watch() 方法的细节。
load() 方法主要的职责是解析 ACL 配置文件,将 ACL 配置规则加载到内存,方便执行权限校验,如代码清单 6-14 所示。
JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome +
File.separator + fileName, JSONObject.class);
if (plainAclConfData == null || plainAclConfData.isEmpty()) {
throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName));
}
第一步:通过 YAML 相关的类库将配置文件解析成一个 JSONObject,如代码清单 6-15 所示。
JSONArray globalWhiteRemoteAddressesList =
plainAclConfData.getJSONArray("globalWhiteRemoteAddresses");
if (globalWhiteRemoteAddressesList != null
&& !globalWhiteRemoteAddressesList
.isEmpty()) {
for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) {
globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i)));
}
}
第二步:根据 ACL 配置文件中配置的全局白名单列表,构建对应规则的校验器,如代码清单 6-16 所示。RocketMQ ACL 支持多种全局 IP 白名单的配置规则,具体的类体系如图 6-6 所示。

1)BlankRemoteAddressStrategy:空,表示不设置白名单,该条规则默认返回false。 2)NullRemoteAddressStrategy:表示全部匹配,该条规则直接返回true,将会阻断其他规则的判断,请慎重使用,其配置风格类似于"" "..." ":*:*:*:*:*:*:*"。 3)MultipleRemoteAddressStrategy:多地址配置模式,IP地址的最后一组使用{},大括号中可以包含多个IP地址用英文逗号隔开,例如192.168.0.{100,101}。 4)RangeRemoteAddressStrategy:范围类地址配置模式,例如192.168.*.或192.168.100-200.10-20。 5)OneRemoteAddressStrategy:单个IP地址配置模式,例如192.168.1.1。
JSONArray accounts = plainAclConfData.getJSONArray(
AclConstants.CONFIG_ACCOUNTS);
if (accounts != null && !accounts.isEmpty()) {
List<PlainAccessConfig> plainAccessConfigList =
accounts.toJavaList(PlainAccessConfig.class);
for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {
PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig);
plainAccessResourceMap.put(plainAccessResource.getAccessKey(), plainAccessResource);
}
}
第三步:接收 account 的标签,按用户名将其配置规则存入 plainAccessResourceMap。
watch() 方法如代码清单 6-17 所示。
private void watch() {
try {
String watchFilePath = fileHome + fileName;
FileWatchService fileWatchService = new FileWatchService(new String[]{watchFilePath},
new FileWatchService.Listener() {
public void onChanged(String path) {
load();
}
});
fileWatchService.start();
log.info("Succeed to start AclWatcherService");
this.isWatchStart = true;
} catch (Exception e) {
log.error("Failed to start AclWatcherService", e);
}
}
watch() 方法实现的思路是创建一个 FileWatchService 线程,用于监听 ACL 的规则配置文件,一旦配置文件的内容发生变化,则执行 FileWatchService.Listener 的回调函数 onChange(),执行配置规则的重新加载。
接下来简单介绍一下 RocketMQ 提供的动态监听配置文件变化的实现机制。
FileWatch Service 类图如图 6-7 所示。
从图 6-7 可知,FileWatchService 是一个线程对象,调用其 start() 方法创建一个独立的线程。下面逐一介绍 FileWatchService 的核心属性。 1)List<String> watchFiles:需要监听的文件列表。 2)List<String> fileCurrentHash:当前监听一次文件对应的哈希值。 3)Listener listener:配置变更监听器,即配置发生变化后需要执行的处理逻辑。 4)int WATCH_INTERVAL = 500:检查配置是否频率变更。 5)MessageDigest md = MessageDigest.getInstance("MD5"):对文件的内容进行md5加密,计算文件的哈希值。
FileWatchService 构造方法如代码清单 6-18 所示。

public FileWatchService(final String[] watchFiles,
final Listener listener) throws Exception {
this.listener = listener;
this.watchFiles = new ArrayList<>();
this.fileCurrentHash = new ArrayList<>();
for (int i = 0; i < watchFiles.length; i++) {
if (StringUtils.isNotEmpty(watchFiles[i]) && new File(watchFiles[i]).exists()) {
this.watchFiles.add(watchFiles[i]);
this.fileCurrentHash.add(hash(watchFiles[i]));
}
}
}
上述实现关键点很简单,如果文件存在,则先将文件路径加入 watchFiles,然后在 fileCurrentHash 对应的位置调用 hash() 方法,计算文件的 md5 值,如代码清单 6-19 所示。
private String hash(String filePath) throws IOException,
NoSuchAlgorithmException {
Path path = Paths.get(filePath);
md.update(Files.readAllBytes(path));
byte[] hash = md.digest();
return UtilAll.bytes2string(hash);
}
代码清单6-19调用了MessageDigest相关的API,将配置文件的所有内容进行md5加密,得到其md5值。文件的内容一旦发生了变化,其生成的md5值就会不同,我们可以以此来判断配置文件的内容是否发生变化。
FileWatchService#run 方法如代码清单6-20所示。
public void run() {
while (!this.isStopped()) {
try {
this.waitForRunning(WATCH_INTERVAL); // @1
for (int i = 0; i < watchFiles.size(); i++) { // @2
String newHash;
try {
newHash = hash(watchFiles.get(i)); // @3
} catch (Exception ignored) {
continue;
}
if
(!newHash.equals(fileCurrentHash.get(i))) { //@4
fileCurrentHash.set(i, newHash);
listener.onChanged(watchFiles.get(i));
}
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
}
该方法的实现比较简单,关键点如下。
-
代码@1:每隔500ms校验一次文件内容。
-
代码@2:遍历该线程监听的文件。
-
代码@3:计算当前配置文件的md5值。
-
代码@4:如果两次计算的md5值不同,意味着文件内容发生变化,则调用对应的事件处理方法。
目前在 RocketMQ 中,哪些配置支持动态生效呢?
|
validate() 方法是实现访问控制的核心实现类,如代码清单 6-21 所示。
for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {
if (remoteAddressStrategy.match(plainAccessResource)) {
return;
}
}
第一步:使用全局白名单对客户端 IP 进行验证,只需要白名单规则中任意一个规则匹配即通过验证,如代码清单 6-22 所示。
if (plainAccessResource.getAccessKey() == null) {
throw new AclException(String.format("No accessKey is configured"));
}
if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {
throw new AclException(String.format("No acl config for%s", ...));
}
第二步:如果在 Broker 端的 ACL 配置文件中没有包含请求用户的访问控制规则,则直接抛出ACL异常,禁止本次操作,如代码清单6-23所示。
PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {
return;
}
第三步:验证用户级别配置的IP白名单规则,如果客户端IP与用户级的IP白名单匹配,则直接返回校验通过,如代码清单 6-24 所示。
String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());
if (!signature.equals(plainAccessResource.getSignature())) {
throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey()));
}
第四步:验证签名是否一致,思路就是客户端在发送请求时,将请求中的参数进行排序,然后使用特定的密钥对其进行签名。服务端接收到请求后也按照与客户端同样的规则计算签名。如果签名相同,则表示验证签名通过,否则返回ACL校验异常,禁止本次操作,如代码清单6-25所示。
checkPerm(plainAccessResource, ownedAccess);
第五步:调用 checkPerm 校验资源的访问权限。我们继续探讨checkPerm的实现细节,如代码清单6-26所示。
if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {
throw new AclException(String.format("Need admin permission for request code =%d, but accessKey = % s is not ", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));
}
第六步:如果当前操作要求必须拥有管理员权限,但当前用户不是管理员角色,则抛出ACL访问异常,如代码清单6-27所示。当前版本规定图6-8所示操作必须拥有管理员权限方可执行。

Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap();
Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap();
if (needCheckedPermMap == null) {
return;
}
if (ownedPermMap == null && ownedAccess.isAdmin()){
return;
}
第七步:如果本次操作的资源无需权限验证,则直接通过。如果当前操作的用户是超级管理员角色,但并未设置任何访问规则,则通过验证,如代码清单6-28所示。
for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) {
String resource = needCheckedEntry.getKey();
Byte neededPerm = needCheckedEntry.getValue();
boolean isGroup = PlainAccessResource.isRetryTopic(resource);
if (ownedPermMap == null || !ownedPermMap.containsKey(resource)) {
byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() : ownedAccess.getDefaultTopicPerm();
if (!Permission.checkPermission(neededPerm, ownedPerm)) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
continue;
}
if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
}
第八步:遍历需要的权限与拥有的权限并进行对比。如果配置对应的权限,则判断是否匹配;如果未配置权限,则判断默认权限下是否允许操作,如果不允许,则抛出 AclException。
图 6-9 清晰地展示了 ACL 访问权限验证的流程。

PlainPermissionManager 还提供了诸如 updateGlobalWhiteAddrsConfig 等修改 ACL配置的API,其实现原理就是修改内存中的配置并持久化到ACL的配置文件中,从而实现无须重启Broker的动态配置即可生效机制。 |
前面重点探讨服务端ACL的实现原理,接下来我们研究消息发送者、消息消费者如何实现ACL机制。
AclClientRPCHook详解
在前面介绍如何使用ACL的时候,大家应该注意到了AclClientRPCHook这个类。ACL客户端的RPCHook俗称钩子函数,在执行具体RPC请求的前后嵌入对应的逻辑。
AclClientRPCHook的类体系如图6-10所示。

-
RPCHook:RPC钩子接口,声明在RPC调用之前和调用之后分别需要执行的扩展逻辑。
-
AclClientRPCHook:用于实现ACL功能的钩子实现类,用于无缝嵌入与ACL访问控制相关的逻辑。
-
SessionCredentials:会话凭证,即用来标识一个用户的信息,其关键字段如下。
-
String ACCESS_KEY:用户访问key,对应配置在Broker端的用户名。
-
String SECRET_KEY:用户访问密钥,客户端首先会对请求参数进行排序,然后使用该密钥对请求参数生成签名验证参数,并随着请求传递到服务端。注意该密钥是不会在网络上进行传输的。
-
String SIGNATURE:签名字符串。
-
String SECURITY_TOKEN:安全会话令牌,通常只需使用 accessKey 和 secretKey。
-
接下来我们重点看AclClientRPCHook#doBeforeRequest方法的实现细节,如代码清单6-29所示。
public void doBeforeRequest (String remoteAddr, RemotingCommand request){
byte[] total = AclUtils.combineRequestContent(request, parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken()));
String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
request.addExtField(SIGNATURE, signature);
request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());
if (sessionCredentials.getSecurityToken() != null) {
request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
}
}
该方法实现比较简单,其关键点如下。
-
将当前访问的用户名加入请求参数,然后对请求参数进行排序。
-
排序的实现比较简单,就是遍历所有的请求参数,然后存储到SortedMap中,即利用SortedMap的排序特性来实现请求参数的排序。
-
遍历SortedMap,将其参数追加到StringBuffer中,然后与secretKey一起生成签名字符串,并使用MD5算法生成验证签名。值得注意的是,secretKey不会通过网络传输。
-
将生成的验证参数传递到服务端。
这里是一种经典的编程技巧,即实现验证签名。首先客户端会将请求参数进行排序,然后组装成字符串,并与密钥一起使用md5算法生成签名字符串,服务端在收到请求参数后同样对请求参数进行排序,以同样的方式生成签名字符串。如果客户端与服务端生成的签名字符串相同,则认为验证签名通过,数据未被篡改。 |