获取和调试 RocketMQ 的源码

RocketMQ 最早是阿里巴巴内部使用的消息中间件,于 2016 年提交到 Apache 基金会,成为 Apache 基金会的顶级开源项目。在 GitHub 网站上搜索 RocketMQ,主页如图1-1 所示。

image 2025 01 17 12 50 04 133
Figure 1. 图1-1 GitHub RocketMQ搜索界面

Eclipse 获取 RocketMQ 源码

下面介绍 Eclipse 获取 RocketMQ 源码的方法。

第一步:单击右键,从菜单中选择 Import Git,弹出如图1-2 所示的界面。

image 2025 01 17 12 51 44 028
Figure 2. 图1-2 Import界面

第二步:单击 Next 按钮,弹出 Import Projects from Git 界面,如图1-3 所示。

image 2025 01 17 12 52 46 642
Figure 3. 图1-3 Import Projects from Git界面

第三步:单击 Next 按钮,选择 Clone URI,得到的界面如图1-4 所示。

image 2025 01 17 12 53 33 357
Figure 4. 图1-4 选择Clone URI后得到的界面

第四步:继续单击 Next 按钮进入下一步,选择代码分支,如图1-5 所示。

image 2025 01 17 12 54 02 545
Figure 5. 图1-5 Import Projects from Git选择分支

第五步:选择需要的分支后单击 Next 按钮,进入代码存放目录,如图1-6 所示。

image 2025 01 17 12 54 35 651
Figure 6. 图1-6 选择源码存放路径

第六步:单击 Next 按钮,Eclipse 将从远程仓库下载代码,如图1-7 所示。

image 2025 01 17 12 55 03 139
Figure 7. 图1-7 Cloning from git界面

第七步:将代码下载到指定目录后,默认选择 Import existing Eclipse projects(单分支),这里手动选择 Import as general project(多分支),单击 Finish 按钮,导入成功,如图1-8 所示。

image 2025 01 17 12 55 39 749
Figure 8. 图1-8 Cloning from git界面

第八步:代码导入成功后,需要将项目转换成 Maven 项目。导入成功后的效果如图1-9 所示。

image 2025 01 17 12 56 05 328
Figure 9. 图1-9 导入项目初始状态

第九步:单击鼠标右键,从菜单中选择 rocketmq_new(文件下载目录名)→Configure→Configure and Detect Nested Projects…​,将项目转换成 Maven 项目,如图1-10 所示。

image 2025 01 17 12 56 36 310
Figure 10. 图1-10 转换Maven项目

第十步:单击 Finish 按钮,执行 Maven 项目转换,完成 RocketMQ 的导入,如图1-11 所示。

image 2025 01 17 12 57 05 027
Figure 11. 图1-11 完成RocketMQ的导入

转换过程中可能会弹出如图1-12 所示提示框。

image 2025 01 17 12 57 30 561
Figure 12. 图1-12 转换Maven项目过程中弹出的提示框

解决办法有 3 种。

  1. 修改根 pom.xml 文件,找到如代码清单1-1 所示的条目,加上注释。

    代码清单1-1 修改根 pom.xml 文件
    <!--
    <plugin>
        <artifactId>maven-help-plugin</artifactId>
        <version>2.2</version>
        <executions>
            <execution>
                <id>generate-effective-dependencies-pom</id>
                <phase>generate-resources</phase>
                <goals>
                    <goal>effective-pom</goal>
                </goals>
                <configuration>
                    <output>${project.build.directory}/effective-pom/effective-dependencies.xml</output>
                </configuration>
            </execution>
        </executions>
    </plugin>
    -->
    
    <!--
    <plugin>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.19.1</version>
        <configuration>
            <forkCount>1</forkCount>
            <reuseForks>true</reuseForks>
        </configuration>
    </plugin>
    -->
  2. 注释 remoting 模块下 pom.xml 文件中的部分代码,如代码清单1-2 所示。

    代码清单1-2 注释pom.xml文件的部分代码
    <!--
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-tcnative</artifactId>
        <version>1.1.33.Fork22</version>
        <classifier>${os.detected.classifier}</classifier>
    </dependency>
    -->
  3. 单击右键,选中一个项目,Maven→Update Project…​,如图1-13 所示。

    image 2025 01 17 13 01 42 533
    Figure 13. 图1-13 更新Maven项目

Eclipse 调试 RocketMQ 源码

本节将展示如何在 Eclipse 中启动 NameServer、Broker,并运行消息发送与消息消费示例程序。

  1. 启动NameServer

    第一步:展开 namesrv 模块,右键选中 NamesrvStartup.java,将其拖曳到 Debug As,选中 Debug Configurations,这时会弹出 Debug Configurations 界面,如图1-14 所示。

    image 2025 01 17 13 02 46 663
    Figure 14. 图1-14 选择Debug Configurations界面

    第二步:选中 Java Application 条目并单击右键,选择 New,弹出 Debug Configurations 界面,如图1-15 所示。

    image 2025 01 17 13 03 23 619
    Figure 15. 图1-15 设置环境变量

    第三步:设置 RocketMQ 运行主目录。选择 Environment 选项卡,添加环境变量ROCKETMQ_HOME。

    第四步:在 RocketMQ 运行主目录中创建 conf、store、logs 三个文件夹,如图1-16 所示。

    image 2025 01 17 13 04 16 697
    Figure 16. 图1-16 RocketMQ主目录

    第五步:从 RocketMQ distribution 部署目录中将 broker.conf、logback_broker.xml 文件复制到 conf 目录中,logback_namesrv.xml 文件只须修改日志文件的目录,broker.conf 文件内容如代码清单1-3 所示。

    代码清单1-3 broker.conf文件
    brokerClusterName=DefaultCluster
    brokerName=broker-abrokerId=0
    
    # nameServer 地址,分号分割
    namesrvAddr=127.0.0.1:9876
    
    deleteWhen=04
    fileReservedTime=48
    brokerRole=ASYNC_MASTER
    flushDiskType=ASYNC_FLUSH
    
    # 存储路径
    storePathRootDir=D:\\rocketmq\\store
    
    # CommitLog 存储路径
    storePathCommitLog=D:\\rocketmq\\store\\commitlog
    
    # 消费队列存储路径
    storePathConsumeQueue=D:\\rocketmq\\store\\consumequeue
    
    # 消息索引存储路径
    storePathIndex=D:\\rocketmq\\store\\index
    
    # Checkpoint 文件存储路径
    storeCheckpoint=D:\\rocketmq\\store\\checkpoint
    
    # abort 文件存储路径
    abortFile=D:\\rocketmq\\store\\abort

    第六步:在 Eclipse Debug 中运行 NamesrvStartup,输出 “The Name Server bootsuccess. Serializetype=JSON”。

  2. 启动Broker

    第一步:展开 Broker 模块,右键选中 BrokerStartup.java,将其拖曳到 Debug As,选中 Debug Configurations,弹出如图1-17 所示的界面,选择 Arguments 选项卡,配置 -c 属性,指定 broker 配置文件路径。

    image 2025 01 17 13 06 49 719
    Figure 17. 图1-17 Arguments选项卡配置

    第二步:切换选项卡 Environment,配置 RocketMQ 主目录,如图1-18 所示。

    image 2025 01 17 13 07 29 324
    Figure 18. 图1-18 Environment选项卡配置

    第三步:以 Debug 模式运行 BrokerStartup.java,查看 ${ROCKET_HOME}/logs/broker.log 文件。未报错则表示启动成功,如代码清单1-4 所示。

    代码清单1-4 Broker启动日志截图
    2021-05-01 20:47:29 INFO main - register broker to name server 127.0.0.1:9876 OK
    2021-05-01 20:47:29 INFO main - The broker[broker-a, 192.168.1.3:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
    2021-05-01 20:47:38 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
    2021-05-01 20:47:38 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
    2021-05-01 20:47:39 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
    2021-05-01 20:48:09 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
    2021-05-01 20:48:37 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
    2021-05-01 20:48:37 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
    2021-05-01 20:48:39 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
    2021-05-01 20:49:09 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
  3. 使用 RocketMQ 提供的实例验证消息发送与消息消费

第一步:修改 org.apache.rocketmq.example.quickstart.Producer 示例程序,设置消息生产者 NameServer 的地址,如代码清单1-5 所示。

代码清单1-5 消息发送示例程序
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 1; i++) {
            try {
                Message msg = new Message(
                    "TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}

第二步:运行该示例程序,查看运行结果。如果输出代码清单1-6 所示的结果,则表示消息发送成功。

代码清单1-6 消息发送结果
SendResult [sendStatus=SEND_OK, msgId=C0A8010325B46D06D69C70A211400000,
offsetMsgId=C0A8010300002A9F0000000000000000, messageQueue=MessageQueue
[topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]

第三步:修改 org.apache.rocketmq.example.quickstart.Consumer 示例程序,设置消息消费者 NameServer 的地址,如代码清单1-7 所示。

代码清单1-7 消息消费示例程序
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

第四步:运行消息消费者程序,如果输出如代码清单1-8 所示的结果,则表示消息消费成功。

代码清单1-8 消息消费结果
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0,storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1521723269443,bornHost=/192.168.1.3:57034, storeTimestamp=1521723269510,storeHost=/192.168.1.3:10911, msgId=C0A8010300002A9F0000000000000000,commitLogOffset=0
, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0,toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1,CONSUME_START_TIME=1521723841419, UNIQ_KEY=C0A8010325B46D06D69C70A211400000,WAIT=true, TAGS=T
agA}, body=16]]]

消息发送与消息消费都成功,说明 RocketMQ 调试环境已经搭建成功,可以直接调试源码,探知 RocketMQ 的奥秘了。

IntelliJ IDEA 获取 RocketMQ 源码

第一步:在 IntelliJ IDEA VCS 菜单中选择 Get from Version Control…​,如图1-19 所示。

image 2025 01 17 13 20 10 776
Figure 19. 图1-19 VCS菜单

第二步:在弹出的对话框中输入 RocketMQ 源码地址,选择保存的本地路径,单击 Clone 按钮,如图1-20 所示。

image 2025 01 17 13 20 43 628
Figure 20. 图1-20 Version Control界面

状态栏有代码下载的进度,如图1-21 所示。

image 2025 01 17 13 21 09 567
Figure 21. 图1-21 RocketMQ Cloning进度条

第三步:源码导入成功后,效果如图1-22 所示。

image 2025 01 17 13 21 38 004
Figure 22. 图1-22 RocketMQ项目结构

第四步:执行 Maven 命令 clean install,下载并编译依赖,可以看到控制台显示 BUILDSUCCESS 的提示信息,如图1-23 所示。

image 2025 01 17 13 22 23 272
Figure 23. 图1-23 提示信息

IntelliJ IDEA 调试 RocketMQ 源码

本节将展示如何在 IntelliJ IDEA 中启动 NameServer、Broker,并编写一个消息发送与消息消费示例程序。

  1. 启动 NameServer

    第一步:展开 namesrv 模块,鼠标右键选中 NamesrvStartup.java,将其拖曳到 Debug As,选中 Debug 'NamesrvStartup.java.main()',弹出如图1-24、图1-25 所示的界面。

    image 2025 01 17 13 23 27 507
    Figure 24. 图1-24 NamesrvStartup Debug界面
    image 2025 01 17 13 23 43 049
    Figure 25. 图1-25 NamesrvStartup Debug Configuration界面

    第二步:单击 Environment variables 后面的按钮,弹出 Environment Variables 界面,如图1-26 所示。

    image 2025 01 17 13 24 15 880
    Figure 26. 图1-26 Environment Variables列表

    第三步:单击 “+”,在 Name 输入框中输入 ROCKETMQ_HOME,在 Value 输入框中输入源码的保存路径。单击 OK 按钮,回到 Debug Configuration 界面。再单击 OK 按钮,如图1-27 所示。

    image 2025 01 17 13 25 09 052
    Figure 27. 图1-27 增加ROCKETMQ_HOME环境变量

    第四步:在 RocketMQ 运行主目录中创建 conf、logs、store 文件夹。

    第五步:从 RocketMQ distribution 部署目录中将 broker.conf、logback_broker.xml、logback_namesrv.xml 等文件复制到 conf 目录中,按需修改 logback_broker.xml、logback_namesrv.xml 文件中日志文件的目录,broker.conf 文件目录内容如代码清单1-9 所示。

    代码清单1-9 broker.conf文件
    brokerClusterName=DefaultCluster
    brokerName=broker-a
    brokerId=0
    
    # nameServer 地址,分号分割
    namesrvAddr=127.0.0.1:9876
    
    deleteWhen=04
    fileReservedTime=48
    brokerRole=ASYNC_MASTER
    flushDiskType=ASYNC_FLUSH
    
    # 存储路径
    storePathRootDir=D:\\rocketmq\\store
    
    # CommitLog 存储路径
    storePathCommitLog=D:\\rocketmq\\store\\commitlog
    
    # 消费队列存储路径
    storePathConsumeQueue=D:\\rocketmq\\store\\consumequeue
    
    # 消息索引存储路径
    storePathIndex=D:\\rocketmq\\store\\index
    
    # checkpoint 文件存储路径
    storeCheckpoint=D:\\rocketmq\\store\\checkpoint
    
    # abort 文件存储路径
    abortFile=D:\\rocketmq\\store\\abort

    第六步:在 IntelliJ IDEA Debug 中运行 NamesrvStartup,并输出 “The Name Server bootsuccess.Serializetype=JSON”。

  2. 启动 Broker

    第一步:展开 Broker 模块,鼠标右键执行 BrokerStartup.java,会提示需要配置 ROCKETMQ_HOME。在 idea 右上角选中 Debug Configurations,在弹出的界面中选择 arguments 选项卡,配置 -c 属性,指定 broker 配置文件的路径,如图1-28 所示。

    image 2025 01 17 13 28 47 208
    Figure 28. 图1-28 设置环境变量

    第二步:切换选项卡 Environment,配置 RocketMQ 主目录和 broker 配置文件,如图1-29 所示。

    image 2025 01 17 13 29 27 781
    Figure 29. 图1-29 运行或调试运行时的环境设置

    第三步:以 Debug 模式运行 BrokerStartup.java,查看 ${ROCKET_HOME}/logs/broker.log 文件,未报错则表示 Broker 启动成功,如代码清单1-10 所示。

    代码清单1-10 Broker启动日志截图
    2021-05-01 17:14:27 INFO PullRequestHoldService - PullRequestHoldService service started
    2021-05-01 17:14:28 INFO main - register broker to name server 127.0.0.1:9876 OK
    2021-05-01 17:14:28 INFO main - The broker[broker-a, 192.168.41.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
    2021-05-01 17:14:37 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
    2021-05-01 17:14:37 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 534 bytes
    2021-05-01 17:14:38 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
    2021-05-01 17:14:41 INFO ClientManageThread_1 - new consumer connected, group: please_rename_unique_group_name_4 CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x5babb0b1, L:/192.168.41.1:10911 - R:/192.168.41.1:50635], clientId=192.168.41.1@15140, language=JAVA, version=253, lastUpdateTimestamp=1529054081078]
    2021-05-01 17:14:41 INFO ClientManageThread_1 - subscription changed, add new topic, group: please_rename_unique_group_name_4 SubscriptionData [classFilterMode=false, topic=%RETRY%please_rename_unique_group_name_4, subString=*, tagsSet=[], codeSet=[], subVersion=1529053720311, expressionType=null]
    2021-05-01 17:14:41 INFO ClientManageThread_1 - subscription changed, add new topic, group: please_rename_unique_group_name_4 SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1529053720326, expressionType=null]
    2021-05-01 17:14:41 INFO ClientManageThread_1 - registerConsumer info changed ConsumerData [groupName=please_rename_unique_group_name_4, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_FIRST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=%RETRY%please_rename_unique_group_name_4, subString=*, tagsSet=[], codeSet=[], subVersion=1529053720311, expressionType=null], SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1529053720326, expressionType=null]]] 192.168.41.1:50635
    2021-05-01 17:14:41 INFO ClientManageThread_1 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x5babb0b1, L:/192.168.41.1:10911 - R:/192.168.41.1:50635], clientId=192.168.41.1@15140, language=JAVA, version=253, lastUpdateTimestamp=1529054081079]
  3. 使用 RocketMQ 提供的实例验证消息发送与消息消费

第一步:修改 org.apache.rocketmq.example.quickstart.Producer 示例程序,设置消息生产者的 NameServer 地址,如代码清单1-11 所示。

代码清单1-11 消息发送示例程序
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 1; i++) {
            try {
                Message msg = new Message(
                    "TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}

第二步:运行示例程序,查看运行结果,如果输出结果如代码清单1-12 所示,则表示消息发送成功。

SendResult [sendStatus=SEND_OK, msgId=C0A8006606EC18B4AAC24BC584450000,
offsetMsgId=C0A8290100002A9F00000000000000B2, messageQueue=MessageQueue
[topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]

第三步:修改 org.apache.rocketmq.example.quickstart.Consumer 示例程序,设置消息消费者的 NameServer 地址,如代码清单1-13 所示。

代码清单1-13 消息消费示例程序
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

第四步:运行消息消费者程序,如果输出如代码清单1-14 所示消息,则表示消息消费成功。

代码清单1-14 消息消费结果
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0,storeSize=178, queueOffset=1, sysFlag=0, bornTimestamp=1529053736201,bornHost=/192.168.41.1:50331, storeTimestamp=1529053736210,storeHost=/192.168.41.1:10911, msgId=C0A8290100002A9F0000000000000164,commitLogOffset=356, bodyCRC=613185359, reconsumeTimes=0,preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0,properties={MIN_O
FFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1529053736226,UNIQ_KEY=C0A800662C8C18B4AAC24BC70D080000, WAIT=true, TAGS=TagA}, body=16]]]

消息发送与消息消费都成功,说明 RocketMQ 调试环境已搭建成功。