RocketMQ 主从切换实战

经过前文的理论分析,相信大家对RocketMQ的主从切换实现原理已经有了非常清晰的认识,也能很容易地得出如下结论:RocketMQ主从切换机制能够从原先的主从同步部署模式,平滑升级到主从切换,即无须对数据进行任何处理,即可兼容旧数据。本节从运维使用的角度来详细介绍其升级过程。

主从切换核心配置属性

主从切换的核心配置参数如下所示。

1)enableDLegerCommitLog:是否开启主从切换机制,默认为false。如果需要开启主从切换,将该值设置为true。 2)dLegerGroup:节点所属的raft复制组,建议与brokerName保持一致,例如broker-a。 3)dLegerPeers:集群节点信息,示例配置为n0-127.0.0.1:40911;n1-127.0.0.1:40912; n2-127.0.0.1:40913,多个节点用英文冒号隔开,单个条目遵循节点ID、IP地址、端口,这里的端口用于Raft集群内部通信。 4)dLegerSelfId:当前节点ID。取自legerPeers中条目的开头,即上述示例中的n0,并且只能第一个字符为英文,其他字符需要配置成数字。 5)storePathRootDir:日志文件的存储根目录,为了支持平滑升级,该值应该与storePath-CommitLog设置为不同的目录。

搭建主从同步环境

先搭建一个传统意义上的主从同步架构,向集群中存入一定量的数据,然后升级到主从切换模式。在Linux服务器上搭建一个RocketMQ主从同步集群不是一件很难的事情,本节不详细介绍其安装过程,只给出相关配置。实验环境的部署结构采取一主一从,部署图如图9-28所示。

image 2025 02 06 20 03 15 400
Figure 1. 图9-28 RocketMQ一主一从部署架构图

192.168.0.220上的broker配置文件如下。

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1=192.168.0.220
brokerIP2=192.168.0.220
namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=false
ini

192.168.0.221上的 broker 配置文件如下。

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
brokerIP1=192.168.0.221
brokerIP2=192.168.0.221
namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin
-release/store
storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=false
ini

相关的启动命令如下。

nohup bin/mqnamesrv  /dev/null  2>&1 &
nohup bin/mqbroker -c conf/broker.conf /dev/null  2>&1 &
bash

安装后的集群信息如图9-29所示。

image 2025 02 06 20 12 17 439
Figure 2. 图9-29 RocketMQ集群部署图

主从同步集群升级到主从切换

部署架构

主从切换集群至少需要3台机器,故搭建Raft集群还需要再引入一台机器,其部署结构如图9-30所示。

image 2025 02 06 20 12 55 661
Figure 3. 图9-30 RocketMQ主从切换部署图

从主从同步集群升级到主从切换集群,用户关心的问题之一就是升级后的集群是否能够兼容原先的数据,即原先存储的消息能否被消费者继续消费。为了方便后续验证,我使用代码清单9-113所示的程序向主从同步mq集群添加了一批方便查询的消息(设置消息的key)。

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_dw_test");
        producer.setNamesrvAddr("192.168.0.220:9876;192.168.0.221:9876");
        producer.start();
        for (int i = 600000; i < 600100; i++) {
            try {
                Messagemsg = new Message("topic_dw_test_by_order_01", null, "m" + i, ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
        System.out.println("end");
    }
}
java

消息的查询结果如图9-31所示。

image 2025 02 06 20 14 49 046
Figure 4. 图9-31 消息查询结果

升级步骤

第一步:使用如下命令将192.168.0.220的rocketmq复制到192.168.0.222。

scp -r rocketmq-all-4.5.2-bin-release/ root@192.168.0.222:/opt/application/rocketmq-all-4.5.2-bin-release
bash

第二步:依次在三台服务器的 broker.conf 配置文件中添加与主从切换相关的配置属性。

192.168.0.220 broker配置文件如下。

brokerClusterName = DefaultCluster
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1=192.168.0.220
brokerIP2=192.168.0.220
namesrvAddr=192.168.0.221:9876;192
.168.0.220:9876
storePathRootDir=/opt/application/rocketmq-all-
4.5.2-bin-release/store
storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=false
# 与dledger相关的属性
enableDLegerCommitLog=true
storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store
dLegerGroup=broker-a
dLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:40911;n2-192.168.0.222:40911
dLegerSelfId=n0
ini

192.168.0.221 broker配置文件如下。

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
brokerIP1=192.168.0.221
brokerIP2=192.168.0.221
namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin
-release/store
storePathCommitLog=/opt/application/rocketmq-all-4.5.2-b
in-release/store/commitlog
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=false
# 与dledger相关的配置属性
enableDLegerCommitLog=true
storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store
dLegerGroup=broker-a
dLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:4091
1;n2-192.168.0.222:40911
dLegerSelfId=n1
ini

192.168.0.222 broker配置文件如下。

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1=192.168.0.222
brokerIP2=192.168.0.222
namesrvAddr=192.168.0.221:9876;192
.168.0.220:9876
storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=false
# 与dledger相关的配置
enableDLegerCommitLog=true
storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store
dLegerGroup=broker-adLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:40911;n2-192.168.0.222:40911
dLegerSelfId=n2
ini

legerSelfId分别为n0、n1、n2。在真实的生产环境中,broker配置文件中的storePath-RootDir、storePathCommitLog尽量使用单独的根目录,这样判断其磁盘使用率时才不会相互影响。

第三步:将store/config目录下的所有文件复制到dledger store的congfig目录下。

cd /opt/application/rocketmq-all-4.5.2-bin-release/store/
cp config/* dledger_store/config/
bash

第四步:依次启动3台Broker。

nohup bin/mqbroker -c conf/broker.conf  /dev/null  2>&1 &
bash

如果启动成功,则在rocketmq-console中看到如图9-32所示的集群信息。

image 2025 02 06 20 25 20 099
Figure 5. 图9-32 RocketMQ主从切换集群信息

接下来介绍验证消息发送与消息查找的过程。

首先我们验证是否能查询到升级之前的消息。以查找key为m600000的消息为例,查找结果如图9-33所示。

image 2025 02 06 20 25 50 825
Figure 6. 图9-33 消息查找界面

图9-33表明能兼容升级之前的数据。

然后我们测试消息发送,使用代码清单9-112所示的代码发送消息,发送结果如图9-34所示

image 2025 02 06 20 26 21 099
Figure 7. 图9-34 消息发送结果界面

再去控制台查询一下消息,其结果表明能查询到新的消息,如图9-35所示。

image 2025 02 06 20 26 46 617
Figure 8. 图9-35 消息查找结果界面

最后我们验证一下主节点宕机,消息发送是否会受到影响。在消息发送的过程中,关闭主节点,集群恢复后的状态如图9-36所示。

image 2025 02 06 20 27 08 207
Figure 9. 图9-36 RocketMQ集群状态