消息发送队列自选择

消息发送默认根据主题的路由信息(主题消息队列)进行负载均衡,负载均衡机制为轮询策略。假设这样一个场景,订单的状态变更消息发送到特定主题,为了避免消息消费者同时消费同一订单不同状态的变更消息,在开发过程中我们应该使用顺序消息。为了提高消息消费的并发度,如果我们能根据某种负载算法,将相同订单不同的消息统一发送到同一个消息消费队列上,则可以避免引入分布式锁,RocketMQ在消息发送时提供了消息队列选择器MessageQueueSelector,如代码清单11-2所示。

代码清单11-2 消息发送自定义分片算法
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
    int orderId = i % 10;
    Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
        public MessageQueue select(List<MessageQueue> mqs, Messagemsg, Object arg) {
            Integer id = (Integer) arg;
            int index = id % mqs.size();
            return mqs.get(index);
        }
    }, orderId);
    System.out.printf("%s%n", sendResult);
}