获取和调试 RocketMQ 的源码
RocketMQ 最早是阿里巴巴内部使用的消息中间件,于 2016 年提交到 Apache 基金会,成为 Apache 基金会的顶级开源项目。在 GitHub 网站上搜索 RocketMQ,主页如图1-1 所示。

Eclipse 获取 RocketMQ 源码
下面介绍 Eclipse 获取 RocketMQ 源码的方法。
第一步:单击右键,从菜单中选择 Import Git,弹出如图1-2 所示的界面。

第二步:单击 Next 按钮,弹出 Import Projects from Git 界面,如图1-3 所示。

第三步:单击 Next 按钮,选择 Clone URI,得到的界面如图1-4 所示。

第四步:继续单击 Next 按钮进入下一步,选择代码分支,如图1-5 所示。

第五步:选择需要的分支后单击 Next 按钮,进入代码存放目录,如图1-6 所示。

第六步:单击 Next 按钮,Eclipse 将从远程仓库下载代码,如图1-7 所示。

第七步:将代码下载到指定目录后,默认选择 Import existing Eclipse projects(单分支),这里手动选择 Import as general project(多分支),单击 Finish 按钮,导入成功,如图1-8 所示。

第八步:代码导入成功后,需要将项目转换成 Maven 项目。导入成功后的效果如图1-9 所示。

第九步:单击鼠标右键,从菜单中选择 rocketmq_new(文件下载目录名)→Configure→Configure and Detect Nested Projects…,将项目转换成 Maven 项目,如图1-10 所示。

第十步:单击 Finish 按钮,执行 Maven 项目转换,完成 RocketMQ 的导入,如图1-11 所示。

转换过程中可能会弹出如图1-12 所示提示框。

解决办法有 3 种。
-
修改根 pom.xml 文件,找到如代码清单1-1 所示的条目,加上注释。
代码清单1-1 修改根 pom.xml 文件<!-- <plugin> <artifactId>maven-help-plugin</artifactId> <version>2.2</version> <executions> <execution> <id>generate-effective-dependencies-pom</id> <phase>generate-resources</phase> <goals> <goal>effective-pom</goal> </goals> <configuration> <output>${project.build.directory}/effective-pom/effective-dependencies.xml</output> </configuration> </execution> </executions> </plugin> --> <!-- <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.19.1</version> <configuration> <forkCount>1</forkCount> <reuseForks>true</reuseForks> </configuration> </plugin> -->
-
注释 remoting 模块下 pom.xml 文件中的部分代码,如代码清单1-2 所示。
代码清单1-2 注释pom.xml文件的部分代码<!-- <dependency> <groupId>io.netty</groupId> <artifactId>netty-tcnative</artifactId> <version>1.1.33.Fork22</version> <classifier>${os.detected.classifier}</classifier> </dependency> -->
-
单击右键,选中一个项目,Maven→Update Project…,如图1-13 所示。
Figure 13. 图1-13 更新Maven项目
Eclipse 调试 RocketMQ 源码
本节将展示如何在 Eclipse 中启动 NameServer、Broker,并运行消息发送与消息消费示例程序。
-
启动NameServer
第一步:展开 namesrv 模块,右键选中 NamesrvStartup.java,将其拖曳到 Debug As,选中 Debug Configurations,这时会弹出 Debug Configurations 界面,如图1-14 所示。
Figure 14. 图1-14 选择Debug Configurations界面第二步:选中 Java Application 条目并单击右键,选择 New,弹出 Debug Configurations 界面,如图1-15 所示。
Figure 15. 图1-15 设置环境变量第三步:设置 RocketMQ 运行主目录。选择 Environment 选项卡,添加环境变量ROCKETMQ_HOME。
第四步:在 RocketMQ 运行主目录中创建 conf、store、logs 三个文件夹,如图1-16 所示。
Figure 16. 图1-16 RocketMQ主目录第五步:从 RocketMQ distribution 部署目录中将 broker.conf、logback_broker.xml 文件复制到 conf 目录中,logback_namesrv.xml 文件只须修改日志文件的目录,broker.conf 文件内容如代码清单1-3 所示。
代码清单1-3 broker.conf文件brokerClusterName=DefaultCluster brokerName=broker-abrokerId=0 # nameServer 地址,分号分割 namesrvAddr=127.0.0.1:9876 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH # 存储路径 storePathRootDir=D:\\rocketmq\\store # CommitLog 存储路径 storePathCommitLog=D:\\rocketmq\\store\\commitlog # 消费队列存储路径 storePathConsumeQueue=D:\\rocketmq\\store\\consumequeue # 消息索引存储路径 storePathIndex=D:\\rocketmq\\store\\index # Checkpoint 文件存储路径 storeCheckpoint=D:\\rocketmq\\store\\checkpoint # abort 文件存储路径 abortFile=D:\\rocketmq\\store\\abort
第六步:在 Eclipse Debug 中运行 NamesrvStartup,输出 “The Name Server bootsuccess. Serializetype=JSON”。
-
启动Broker
第一步:展开 Broker 模块,右键选中 BrokerStartup.java,将其拖曳到 Debug As,选中 Debug Configurations,弹出如图1-17 所示的界面,选择 Arguments 选项卡,配置
-c
属性,指定 broker 配置文件路径。Figure 17. 图1-17 Arguments选项卡配置第二步:切换选项卡 Environment,配置 RocketMQ 主目录,如图1-18 所示。
Figure 18. 图1-18 Environment选项卡配置第三步:以 Debug 模式运行 BrokerStartup.java,查看
${ROCKET_HOME}/logs/broker.log
文件。未报错则表示启动成功,如代码清单1-4 所示。代码清单1-4 Broker启动日志截图2021-05-01 20:47:29 INFO main - register broker to name server 127.0.0.1:9876 OK 2021-05-01 20:47:29 INFO main - The broker[broker-a, 192.168.1.3:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876 2021-05-01 20:47:38 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes 2021-05-01 20:47:38 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes 2021-05-01 20:47:39 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK 2021-05-01 20:48:09 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK 2021-05-01 20:48:37 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes 2021-05-01 20:48:37 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes 2021-05-01 20:48:39 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK 2021-05-01 20:49:09 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
-
使用 RocketMQ 提供的实例验证消息发送与消息消费
第一步:修改 org.apache.rocketmq.example.quickstart.Producer 示例程序,设置消息生产者 NameServer 的地址,如代码清单1-5 所示。
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++) {
try {
Message msg = new Message(
"TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
第二步:运行该示例程序,查看运行结果。如果输出代码清单1-6 所示的结果,则表示消息发送成功。
SendResult [sendStatus=SEND_OK, msgId=C0A8010325B46D06D69C70A211400000,
offsetMsgId=C0A8010300002A9F0000000000000000, messageQueue=MessageQueue
[topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]
第三步:修改 org.apache.rocketmq.example.quickstart.Consumer 示例程序,设置消息消费者 NameServer 的地址,如代码清单1-7 所示。
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
第四步:运行消息消费者程序,如果输出如代码清单1-8 所示的结果,则表示消息消费成功。
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0,storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1521723269443,bornHost=/192.168.1.3:57034, storeTimestamp=1521723269510,storeHost=/192.168.1.3:10911, msgId=C0A8010300002A9F0000000000000000,commitLogOffset=0
, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0,toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1,CONSUME_START_TIME=1521723841419, UNIQ_KEY=C0A8010325B46D06D69C70A211400000,WAIT=true, TAGS=T
agA}, body=16]]]
消息发送与消息消费都成功,说明 RocketMQ 调试环境已经搭建成功,可以直接调试源码,探知 RocketMQ 的奥秘了。
IntelliJ IDEA 获取 RocketMQ 源码
第一步:在 IntelliJ IDEA VCS 菜单中选择 Get from Version Control…,如图1-19 所示。

第二步:在弹出的对话框中输入 RocketMQ 源码地址,选择保存的本地路径,单击 Clone 按钮,如图1-20 所示。

状态栏有代码下载的进度,如图1-21 所示。

第三步:源码导入成功后,效果如图1-22 所示。

第四步:执行 Maven 命令 clean install,下载并编译依赖,可以看到控制台显示 BUILDSUCCESS 的提示信息,如图1-23 所示。

IntelliJ IDEA 调试 RocketMQ 源码
本节将展示如何在 IntelliJ IDEA 中启动 NameServer、Broker,并编写一个消息发送与消息消费示例程序。
-
启动 NameServer
第一步:展开 namesrv 模块,鼠标右键选中 NamesrvStartup.java,将其拖曳到 Debug As,选中 Debug 'NamesrvStartup.java.main()',弹出如图1-24、图1-25 所示的界面。
Figure 24. 图1-24 NamesrvStartup Debug界面Figure 25. 图1-25 NamesrvStartup Debug Configuration界面第二步:单击 Environment variables 后面的按钮,弹出 Environment Variables 界面,如图1-26 所示。
Figure 26. 图1-26 Environment Variables列表第三步:单击 “+”,在 Name 输入框中输入 ROCKETMQ_HOME,在 Value 输入框中输入源码的保存路径。单击 OK 按钮,回到 Debug Configuration 界面。再单击 OK 按钮,如图1-27 所示。
Figure 27. 图1-27 增加ROCKETMQ_HOME环境变量第四步:在 RocketMQ 运行主目录中创建 conf、logs、store 文件夹。
第五步:从 RocketMQ distribution 部署目录中将 broker.conf、logback_broker.xml、logback_namesrv.xml 等文件复制到 conf 目录中,按需修改 logback_broker.xml、logback_namesrv.xml 文件中日志文件的目录,broker.conf 文件目录内容如代码清单1-9 所示。
代码清单1-9 broker.conf文件brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 # nameServer 地址,分号分割 namesrvAddr=127.0.0.1:9876 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH # 存储路径 storePathRootDir=D:\\rocketmq\\store # CommitLog 存储路径 storePathCommitLog=D:\\rocketmq\\store\\commitlog # 消费队列存储路径 storePathConsumeQueue=D:\\rocketmq\\store\\consumequeue # 消息索引存储路径 storePathIndex=D:\\rocketmq\\store\\index # checkpoint 文件存储路径 storeCheckpoint=D:\\rocketmq\\store\\checkpoint # abort 文件存储路径 abortFile=D:\\rocketmq\\store\\abort
第六步:在 IntelliJ IDEA Debug 中运行 NamesrvStartup,并输出 “The Name Server bootsuccess.Serializetype=JSON”。
-
启动 Broker
第一步:展开 Broker 模块,鼠标右键执行 BrokerStartup.java,会提示需要配置 ROCKETMQ_HOME。在 idea 右上角选中 Debug Configurations,在弹出的界面中选择 arguments 选项卡,配置
-c
属性,指定 broker 配置文件的路径,如图1-28 所示。Figure 28. 图1-28 设置环境变量第二步:切换选项卡 Environment,配置 RocketMQ 主目录和 broker 配置文件,如图1-29 所示。
Figure 29. 图1-29 运行或调试运行时的环境设置第三步:以 Debug 模式运行 BrokerStartup.java,查看
${ROCKET_HOME}/logs/broker.log
文件,未报错则表示 Broker 启动成功,如代码清单1-10 所示。代码清单1-10 Broker启动日志截图2021-05-01 17:14:27 INFO PullRequestHoldService - PullRequestHoldService service started 2021-05-01 17:14:28 INFO main - register broker to name server 127.0.0.1:9876 OK 2021-05-01 17:14:28 INFO main - The broker[broker-a, 192.168.41.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876 2021-05-01 17:14:37 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes 2021-05-01 17:14:37 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 534 bytes 2021-05-01 17:14:38 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK 2021-05-01 17:14:41 INFO ClientManageThread_1 - new consumer connected, group: please_rename_unique_group_name_4 CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x5babb0b1, L:/192.168.41.1:10911 - R:/192.168.41.1:50635], clientId=192.168.41.1@15140, language=JAVA, version=253, lastUpdateTimestamp=1529054081078] 2021-05-01 17:14:41 INFO ClientManageThread_1 - subscription changed, add new topic, group: please_rename_unique_group_name_4 SubscriptionData [classFilterMode=false, topic=%RETRY%please_rename_unique_group_name_4, subString=*, tagsSet=[], codeSet=[], subVersion=1529053720311, expressionType=null] 2021-05-01 17:14:41 INFO ClientManageThread_1 - subscription changed, add new topic, group: please_rename_unique_group_name_4 SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1529053720326, expressionType=null] 2021-05-01 17:14:41 INFO ClientManageThread_1 - registerConsumer info changed ConsumerData [groupName=please_rename_unique_group_name_4, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_FIRST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=%RETRY%please_rename_unique_group_name_4, subString=*, tagsSet=[], codeSet=[], subVersion=1529053720311, expressionType=null], SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1529053720326, expressionType=null]]] 192.168.41.1:50635 2021-05-01 17:14:41 INFO ClientManageThread_1 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x5babb0b1, L:/192.168.41.1:10911 - R:/192.168.41.1:50635], clientId=192.168.41.1@15140, language=JAVA, version=253, lastUpdateTimestamp=1529054081079]
-
使用 RocketMQ 提供的实例验证消息发送与消息消费
第一步:修改 org.apache.rocketmq.example.quickstart.Producer 示例程序,设置消息生产者的 NameServer 地址,如代码清单1-11 所示。
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++) {
try {
Message msg = new Message(
"TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
第二步:运行示例程序,查看运行结果,如果输出结果如代码清单1-12 所示,则表示消息发送成功。
SendResult [sendStatus=SEND_OK, msgId=C0A8006606EC18B4AAC24BC584450000,
offsetMsgId=C0A8290100002A9F00000000000000B2, messageQueue=MessageQueue
[topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]
第三步:修改 org.apache.rocketmq.example.quickstart.Consumer 示例程序,设置消息消费者的 NameServer 地址,如代码清单1-13 所示。
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
第四步:运行消息消费者程序,如果输出如代码清单1-14 所示消息,则表示消息消费成功。
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0,storeSize=178, queueOffset=1, sysFlag=0, bornTimestamp=1529053736201,bornHost=/192.168.41.1:50331, storeTimestamp=1529053736210,storeHost=/192.168.41.1:10911, msgId=C0A8290100002A9F0000000000000164,commitLogOffset=356, bodyCRC=613185359, reconsumeTimes=0,preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0,properties={MIN_O
FFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1529053736226,UNIQ_KEY=C0A800662C8C18B4AAC24BC70D080000, WAIT=true, TAGS=TagA}, body=16]]]
消息发送与消息消费都成功,说明 RocketMQ 调试环境已搭建成功。