RocketMQ 主从切换实战
经过前文的理论分析,相信大家对RocketMQ的主从切换实现原理已经有了非常清晰的认识,也能很容易地得出如下结论:RocketMQ主从切换机制能够从原先的主从同步部署模式,平滑升级到主从切换,即无须对数据进行任何处理,即可兼容旧数据。本节从运维使用的角度来详细介绍其升级过程。
主从切换核心配置属性
主从切换的核心配置参数如下所示。
1)enableDLegerCommitLog:是否开启主从切换机制,默认为false。如果需要开启主从切换,将该值设置为true。 2)dLegerGroup:节点所属的raft复制组,建议与brokerName保持一致,例如broker-a。 3)dLegerPeers:集群节点信息,示例配置为n0-127.0.0.1:40911;n1-127.0.0.1:40912; n2-127.0.0.1:40913,多个节点用英文冒号隔开,单个条目遵循节点ID、IP地址、端口,这里的端口用于Raft集群内部通信。 4)dLegerSelfId:当前节点ID。取自legerPeers中条目的开头,即上述示例中的n0,并且只能第一个字符为英文,其他字符需要配置成数字。 5)storePathRootDir:日志文件的存储根目录,为了支持平滑升级,该值应该与storePath-CommitLog设置为不同的目录。
搭建主从同步环境
先搭建一个传统意义上的主从同步架构,向集群中存入一定量的数据,然后升级到主从切换模式。在Linux服务器上搭建一个RocketMQ主从同步集群不是一件很难的事情,本节不详细介绍其安装过程,只给出相关配置。实验环境的部署结构采取一主一从,部署图如图9-28所示。

192.168.0.220上的broker配置文件如下。
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH brokerIP1=192.168.0.220 brokerIP2=192.168.0.220 namesrvAddr=192.168.0.221:9876;192.168.0.220:9876 storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog autoCreateTopicEnable=false autoCreateSubscriptionGroup=false
ini
192.168.0.221上的 broker 配置文件如下。
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 1 deleteWhen = 04 fileReservedTime = 48 brokerRole = SLAVE flushDiskType = ASYNC_FLUSH brokerIP1=192.168.0.221 brokerIP2=192.168.0.221 namesrvAddr=192.168.0.221:9876;192.168.0.220:9876 storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin -release/store storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog autoCreateTopicEnable=false autoCreateSubscriptionGroup=false
ini
相关的启动命令如下。
nohup bin/mqnamesrv /dev/null 2>&1 & nohup bin/mqbroker -c conf/broker.conf /dev/null 2>&1 &
bash
安装后的集群信息如图9-29所示。

主从同步集群升级到主从切换
部署架构
主从切换集群至少需要3台机器,故搭建Raft集群还需要再引入一台机器,其部署结构如图9-30所示。

从主从同步集群升级到主从切换集群,用户关心的问题之一就是升级后的集群是否能够兼容原先的数据,即原先存储的消息能否被消费者继续消费。为了方便后续验证,我使用代码清单9-113所示的程序向主从同步mq集群添加了一批方便查询的消息(设置消息的key)。
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer_dw_test");
producer.setNamesrvAddr("192.168.0.220:9876;192.168.0.221:9876");
producer.start();
for (int i = 600000; i < 600100; i++) {
try {
Messagemsg = new Message("topic_dw_test_by_order_01", null, "m" + i, ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
System.out.println("end");
}
}
java
消息的查询结果如图9-31所示。

升级步骤
第一步:使用如下命令将192.168.0.220的rocketmq复制到192.168.0.222。
scp -r rocketmq-all-4.5.2-bin-release/ root@192.168.0.222:/opt/application/rocketmq-all-4.5.2-bin-release
bash
第二步:依次在三台服务器的 broker.conf 配置文件中添加与主从切换相关的配置属性。
192.168.0.220 broker配置文件如下。
brokerClusterName = DefaultCluster brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH brokerIP1=192.168.0.220 brokerIP2=192.168.0.220 namesrvAddr=192.168.0.221:9876;192 .168.0.220:9876 storePathRootDir=/opt/application/rocketmq-all- 4.5.2-bin-release/store storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog autoCreateTopicEnable=false autoCreateSubscriptionGroup=false # 与dledger相关的属性 enableDLegerCommitLog=true storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store dLegerGroup=broker-a dLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:40911;n2-192.168.0.222:40911 dLegerSelfId=n0
ini
192.168.0.221 broker配置文件如下。
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 1 deleteWhen = 04 fileReservedTime = 48 brokerRole = SLAVE flushDiskType = ASYNC_FLUSH brokerIP1=192.168.0.221 brokerIP2=192.168.0.221 namesrvAddr=192.168.0.221:9876;192.168.0.220:9876 storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin -release/store storePathCommitLog=/opt/application/rocketmq-all-4.5.2-b in-release/store/commitlog autoCreateTopicEnable=false autoCreateSubscriptionGroup=false # 与dledger相关的配置属性 enableDLegerCommitLog=true storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store dLegerGroup=broker-a dLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:4091 1;n2-192.168.0.222:40911 dLegerSelfId=n1
ini
192.168.0.222 broker配置文件如下。
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH brokerIP1=192.168.0.222 brokerIP2=192.168.0.222 namesrvAddr=192.168.0.221:9876;192 .168.0.220:9876 storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog autoCreateTopicEnable=false autoCreateSubscriptionGroup=false # 与dledger相关的配置 enableDLegerCommitLog=true storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store dLegerGroup=broker-adLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:40911;n2-192.168.0.222:40911 dLegerSelfId=n2
ini
legerSelfId分别为n0、n1、n2。在真实的生产环境中,broker配置文件中的storePath-RootDir、storePathCommitLog尽量使用单独的根目录,这样判断其磁盘使用率时才不会相互影响。 |
第三步:将store/config目录下的所有文件复制到dledger store的congfig目录下。
cd /opt/application/rocketmq-all-4.5.2-bin-release/store/
cp config/* dledger_store/config/
bash
第四步:依次启动3台Broker。
nohup bin/mqbroker -c conf/broker.conf /dev/null 2>&1 &
bash
如果启动成功,则在rocketmq-console中看到如图9-32所示的集群信息。

接下来介绍验证消息发送与消息查找的过程。
首先我们验证是否能查询到升级之前的消息。以查找key为m600000的消息为例,查找结果如图9-33所示。

图9-33表明能兼容升级之前的数据。
然后我们测试消息发送,使用代码清单9-112所示的代码发送消息,发送结果如图9-34所示

再去控制台查询一下消息,其结果表明能查询到新的消息,如图9-35所示。

最后我们验证一下主节点宕机,消息发送是否会受到影响。在消息发送的过程中,关闭主节点,集群恢复后的状态如图9-36所示。
