消息消费过程
我们先回顾一下消息拉取的过程:PullMessageService 负责对消息队列进行消息拉取,从远端服务器拉取消息后存入 ProcessQueue 消息处理队列中,然后调用 ConsumeMessageService#submitConsumeRequest 方法进行消息消费。使用线程池消费消息,确保了消息拉取与消息消费的解耦。RocketMQ 使用 ConsumeMessageService 来实现消息消费的处理逻辑。RocketMQ 支持顺序消费与并发消费,本节将重点关注并发消费的流程,顺序消费将在 5.9 节详细分析。ConsumeMessageService 核心类图如图 5-14 所示。

核心方法描述如下。
-
ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg,String brokerName):直接消费消息,主要用于通过管理命令接收消费消息。
-
MessageExt msg:消息。
-
brokerName:Broker名称。
-
-
void submitConsumeRequest(Listmsgs, ProcessQueue processQueue,MessageQueue messageQueue, boolean dispathToConsume):提交消息消费。
-
Listmsgs:消息列表,默认一次从服务器最多拉取32条消息。
-
ProcessQueue processQueue:消息处理队列。
-
MessageQueue messageQueue:消息所属消费队列。
-
boolean dispathToConsume:是否转发到消费线程池,并发消费时忽略该参数。
-
ConsumeMessageConcurrentlyService 并发消息消费核心参数解释如下。
-
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl:消息推模式实现类。
-
DefaultMQPushConsumer defaultMQPushConsumer:消费者对象。
-
MessageListenerConcurrently messageListener:并发消息业务事件类。
-
BlockingQueue consumeRequestQueue:消息消费任务队列。
-
ThreadPoolExecutor consumeExecutor:消息消费线程池。
-
String consumerGroup:消费组。
-
ScheduledExecutorService scheduledExecutorService:添加消费任务到 consumeExecutor 延迟调度器。
-
ScheduledExecutorService cleanExpireMsgExecutors:定时删除过期消息线程池。为了揭示消息消费的完整过程,从服务器拉取到消息后,回调 PullCallBack 方法,先将消息放入 ProcessQueue 中,然后把消息提交到消费线程池中执行,也就是调用 ConsumeMessageService#submitConsumeRequest 开始进入消息消费的世界。
消息消费
消费者消息消费服务 ConsumeMessageConcurrentlyService 的主要方法是 submitConsumeRequest 提交消费请求,具体逻辑如代码清单 5-50 所示。
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
}
第一步:consumeMessageBatchMaxSize 表示消息批次,也就是一次消息消费任务 ConsumeRequest 中包含的消息条数,默认为 1。msgs.size() 默认最多为 32 条消息,受 DefaultMQPushConsumer.pullBatchSize 属性控制,如果 msgs.size() 小于 consumeMessage BatchMaxSize,则直接将拉取到的消息放入 ConsumeRequest,然后将 consumeRequest 提交到消息消费者线程池中。如果提交过程中出现拒绝提交异常,则延迟 5s 再提交,如代码清单 5-51 所示。这里其实是给出一种标准的拒绝提交实现方式,实际上,由于消费者线程池使用的任务队列 LinkedBlockingQueue 为无界队列,故不会出现拒绝提交异常。
if (msgs.size() > consumeBatchSize) {
for (int total = 0; total < msgs.
size(); ) {
List<MessageExt> msgThis = ne
w ArrayList<MessageExt > (consumeBatchSize);
for (int i = 0; i < consumeBa
tchSize;
i++, total++){
if (total < msgs.size()) {
msgThis.add(msgs.get(
total));
} else {
break;
}
}
ConsumeRequest consumeRequest
= new ConsumeRequest(msgThis, processQueue, messageQueue
);
try {
this.consumeExecutor.subm
it(consumeRequest);
} catch (RejectedExecutionExc
eption e){
for (total < msgs.size();
total++)
; {
msgThis.add(msgs.get(
total));
}
this.submitConsumeRequest
Later(consumeRequest);
}
}
}
第二步:如果拉取的消息条数大于 consumeMessageBatchMaxSize,则对拉取消息进行分页,每页 consumeMessageBatchMaxSize 条消息,创建多个 ConsumeRequest 任务并提交到消费线程池。ConsumeRequest 的 run() 方法封装了消息消费的具体逻辑,如代码清单 5-52 所示。
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
第三步:进入具体的消息消费队列时,会先检查 processQueue 的 dropped,如果设置为 true,则停止该队列的消费。在进行消息重新负载时,如果该消息队列被分配给消费组内的其他消费者,需要将 droped 设置为 true,阻止消费者继续消费不属于自己的消息队列。
第四步:执行消息消费钩子函数 ConsumeMessageHook#consumeMessageBefore。通过 consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(hook) 方法消息消费执行钩子函数,如代码清单 5-53 所示。
public void resetRetryTopic(final List<MessageExt> msgs) {
final String groupTopic = MixAll.getRetryTopic(consumerGroup);
for (MessageExt msg : msgs) {
String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
msg.setTopic(retryTopic);
}
}
}
第五步:恢复重试消息主题名。这是为什么呢?这是由消息重试机制决定的,RocketMQ 将消息存入 CommitLog 文件时,如果发现消息的延时级别 delayTimeLevel 大于 0,会先将重试主题存入消息的属性,然后将主题名称设置为 SCHEDULE_TOPIC_XXXX,以便之后重新参与消息消费,如代码清单 5-54 所示。
try {
ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
hasException = true;
}
第六步:执行具体的消息消费,调用应用程序消息监听器的 consumeMessage 方法,进入具体的消息消费业务逻辑,返回该批消息的消费结果,即 CONSUME_SUCCESS(消费成功)或 RECONSUME_LATER(需要重新消费)。
第七步:执行消息消费钩子函数 ConsumeMessageHook#consumeMessageAfter,如代码清单 5-55 所示。
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status,context, this);
}
第八步:执行业务消息消费后,在处理结果前再次验证一次 ProcessQueue 的 isDroped 状态值。如果状态值为 true,将不对结果进行任何处理。也就是说,在消息消费进入第四步时,如果因新的消费者加入或原先的消费者出现宕机,导致原先分配给消费者的队列在负载之后分配给了别的消费者,那么消息会被重复消费,如代码清单 5-56 所示。
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
break;
case RECONSUME_LATER:
ackIndex = -1;
break;
default:
break;
}
第九步:根据消息监听器返回的结果计算 ackIndex,如果返回 CONSUME_SUCCESS,则将 ackIndex 设置为 msgs.size()-1,如果返回 RECONSUME_LATER,则将 ackIndex 设置为 -1,这是为下文发送 msg back(ACK)消息做的准备,如代码清单 5-57 所示。
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
第十步:如果是广播模式,业务方会返回 RECONSUME_LATER,消息并不会被重新消费,而是以警告级别输出到日志文件中。如果是集群模式,消息消费成功,因为 ackIndex=consumeRequest.getMsgs().size()-1,所以 i=ackIndex+1 等于 consumeRequest.getMsgs().size(),并不会执行 sendMessageBack。只有在业务方返回 RECONSUME_LATER 时,该批消息都需要发送 ACK 消息,如果消息发送失败,则直接将本批 ACK 消费发送失败的消息再次封装为 ConsumeRequest,然后延迟 5s 重新消费。如果 ACK 消息发送成功,则该消息会延迟消费,如代码清单 5-58 所示。
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore()
.updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
第十一步:从 ProcessQueue 中移除这批消息,这里返回的偏移量是移除该批消息后最小的偏移量。然后用该偏移量更新消息消费进度,以便消费者重启后能从上一次的消费进度开始消费,避免消息重复消费。值得注意的是,当消息监听器返回 RECONSUME_LATER 时,消息消费进度也会向前推进,并用 ProcessQueue 中最小的队列偏移量调用消息消费进度存储器 OffsetStore 更新消费进度。这是因为当返回 RECONSUME_LATER 时,RocketMQ 会创建一条与原消息属性相同的消息,拥有一个唯一的新 msgId,并存储原消息 ID,该消息会存入 CommitLog 文件,与原消息没有任何关联,所以该消息也会进入 ConsuemeQueue,并拥有一个全新的队列偏移量。
并发消息消费的整体流程就介绍到这里,下文会对消息消费的其中两个重要步骤进行详细分析。
消息确认
如果消息监听器返回的消费结果为 RECONSUME_LATER,则需要将这些消息发送给 Broker 以延迟消息。如果发送 ACK 消息失败,将延迟 5s 后提交线程池进行消费。ACK 消息发送的网络客户端入口为 MQClientAPIImpl#consumerSendMessageBack,命令编码为 RequestCode.CONSUMER_SEND_MSG_BACK,协议头部如图 5-15 所示。

下面逐一介绍 ConsumerSendMsgBackRequestHeader 的核心属性。
-
offset:消息物理偏移量。
-
group:消费组名。
-
delayLevel:延迟级别。RocketMQ 不支持精确的定时消息调度,而是提供几个延时级别,MessageStoreConfig#messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m9m 10m 20m 30m 1h 2h",delayLevel=1,表示延迟5s,delayLevel=2,表示延迟10s。
-
originMsgId:消息ID。
-
originTopic:消息主题。
-
maxReconsumeTimes:最大重新消费次数,默认16次。
客户端以同步方式发送 RequestCode.CONSUMER_SEND 到服务端。服务端命令处理器为 org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack。
第一步:获取消费组的订阅配置信息,如果配置信息为空,返回配置组信息不存在错误,如果重试队列数量小于 1,则直接返回成功,说明该消费组不支持重试。消费组核心类图如图 5-16 所示。

下面逐一介绍 SubscriptionGroupConfig 的核心属性。
-
String groupName:消费组名。
-
consumeEnable:是否可以消费,默认为 true,如果 consumeEnable=false,该消费组无法拉取消息,因而无法消费消息。
-
consumeFromMinEnable:是否允许从队列最小偏移量开始消费,默认为 true,目前未使用该参数。
-
consumeBroadcastEnable:设置该消费组是否能以广播模式消费,默认为 true,如果设置为 false,表示只能以集群模式消费。
-
retryQueueNums:重试队列个数,默认为 1,每一个 Broker 上有一个重试队列。
-
retryMaxTimes:消息最大重试次数,默认 16 次。
-
brokerId:主节点 ID。
-
whichBrokerWhenConsumeSlowly:如果消息堵塞(主节点),将转向该 brokerId 的服务器上拉取消息,默认为 1。
-
notifyConsumerIdsChangedEnable:当消费发生变化时,是否立即进行消息队列重新负载。消费组订阅信息配置信息存储在 Broker 的
${ROCKET_HOME}/store/config/subscriptionGroup.json
中。BrokerConfig.autoCreateSubscriptionGroup 默认为 true,表示在第一次使用消费组配置信息时如果不存在消费组,则使用上述默认值自动创建一个,如果为 false,则只能通过客户端命令 mqadmin updateSubGroup 创建消费组后再修改相关参数,如代码清单 5-59 所示。
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
第二步:创建重试主题,重试主题名称为 %RETRY%+消费组名称,从重试队列中随机选择一个队列,并构建 TopicConfig 主题配置信息,如代码清单 5-60 所示。
MessageExt msgExt = this.brokerController.getMessageStore()
.lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " +
requestHeader.getOffset());
return response;
}
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
第三步:根据消息物理偏移量从 CommitLog 文件中获取消息,同时将消息的主题存入属性。
第四步:设置消息重试次数,如果消息重试次数已超过 maxReconsumeTimes,再次改变 newTopic 主题为 DLQ("%DLQ%"),该主题的权限为只写,说明消息一旦进入 DLQ 队列,RocketMQ 将不负责再次调度消费了,需要人工干预,如代码清单 5-61 所示。
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.g etProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
第五步:根据原先的消息创建一个新的消息对象,重试消息会拥有一个唯一消息 ID(msgId)并存入 CommitLog 文件。这里不会更新原先的消息,而是会将原先的主题、消息 ID 存入消息属性,主题名称为重试主题,其他属性与原消息保持一致。
第六步:将消息存入 CommitLog 文件。这里想再重点突出消息重试机制,该机制的实现依托于定时任务,如代码清单 5-62 所示。
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMess
ageService.delayLevel2QueueId(msg.getDelayTimeLevel());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getPro perties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
在存入 CommitLog 文件之前,如果消息的延迟级别 delayTimeLevel 大于 0,将消息的主题与队列替换为定时任务主题 “SCHEDULE_TOPIC_XXXX”,队列 ID 为延迟级别减 1。再次将消息主题、队列存入消息属性,键分别为 PROPERTY_REAL_TOPIC、PROPERTY_REAL_QUEUE_ID。
ACK 消息存入 CommitLog 文件后,将依托 RocketMQ 定时消息机制在延迟时间到期后,再次拉取消息,提交至消费线程池,定时任务机制的细节将在 5.7 节进行分析。ACK 消息是同步发送的,如果在发送过程中出现错误,将记录所有发送 ACK 消息失败的消息,然后再次封装成 ConsumeRequest,延迟 5s 执行。
消费进度管理
消息消费者在消费一批消息后,需要记录该批消息已经消费完毕,否则当消费者重新启动时,又要从消息消费队列最开始消费。从 5.6.1 节也可以看到,一次消息消费后会从 ProcessQueue 处理队列中移除该批消息,返回 ProcessQueue 的最小偏移量,并存入消息进度表。那么消息进度文件存储在哪里合适呢?
-
广播模式:同一个消费组的所有消息消费者都需要消费主题下的所有消息,也就是同组内消费者的消息消费行为是对立的,互相不影响,故消息进度需要独立存储,最理想的存储地方应该是与消费者绑定。
-
集群模式:同一个消费组内的所有消息消费者共享消息主题下的所有消息,同一条消息(同一个消息消费队列)在同一时间只会被消费组内的一个消费者消费,并且随着消费队列的动态变化而重新负载,因此消费进度需要保存在每个消费者都能访问到的地方。
RocketMQ 消息消费进度接口如图 5-17 所示。

-
void load():从消息进度存储文件加载消息进度到内存。
-
void updateOffset(MessageQueue mq, long offset, boolean increaseOnly):更新内存中的消息消费进度。
-
MessageQueue mq:消息消费队列。
-
long offset:消息消费偏移量。
-
increaseOnly:true表示offset必须大于内存中当前的消费偏移量才更新。
-
-
long readOffset(final MessageQueue mq, final ReadOffsetType type):读取消息消费进度。
-
mq:消息消费队列。
-
ReadOffsetType type:读取方式,可选值包括 READ_FROM_MEMORY,即从内存中读取,READ_FROM_STORE,即从磁盘中读取,MEMORY_FIRST_THEN_STORE,即先从内存中读取,再从磁盘中读取。
-
-
void persistAll(final Set messageQueue)持久化指定消息队列进度到磁盘。
Set messageQueue:消息队列集合。
-
void removeOffset(messageQueue mq):将消息队列的消息消费进度从内存中移除。
-
Map cloneOffsetTable(String topic):复制该主题下所有消息队列的消息消费进度。
-
void updateConsumeOffsetToBroker(MessageQueue mq,long offset,booleanisOneway):使用集群模式更新存储在 Broker 端的消息消费进度。
广播模式消费进度存储
广播模式消息消费进度存储在消费者本地,其实现类为 org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore,如代码清单 5-63 所示。
public final static String LOCAL_OFFSET_STORE_DIR =
System.getProperty("rocketmq.client.localOffset StoreDir", System.getProperty(" user.home") + File.separator + ".rocketmq_offsets");
private final MQClientInstance mQClientFactory;
private final String groupName;
private final String storePath;
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>();
-
LOCAL_OFFSET_STORE_DIR:消息进度存储目录,可以通过 -Drocketmq.client.localOffsetStoreDir 指定,如果未指定,则默认为用户主目录 /.rocketmq_offsets。
-
MQClientInstance mQClientFactory:消息客户端。
-
groupName:消息消费组。
-
storePath:消息进度存储文件 LOCAL_OFFSET_STORE_DIR/.rocketmq_offsets/{mQClientFactory.getClientId()}/groupName/offsets.json。
-
ConcurrentMap<MessageQueue, AtomicLong> offsetTable:消息消费进度(内存)。
下面对 LocalFileOffsetStore 核心方法进行简单介绍。
load() 方法如代码清单 5-64 所示。
public void load() throws MQClientException {
OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null){
offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()){
AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
log.info("load consumer's offset, {} {} {}", this.groupName, mq, offset.get());
}
}
}
OffsetSerializeWrapper 内部就是 ConcurrentMap<MessageQueue,AtomicLong>offsetTable 数据结构的封装,readLocalOffset 方法首先从 storePath 中尝试加载内容,如果读取的内容为空,尝试从 storePath + ".bak" 中加载,如果还是未找到内容,则返回 null。消息进度文件存储内容如图 5-18 所示。

广播模式下消费进度与消费组没什么关系,直接保存 MessageQueue:Offset。
persistAll(Set<MessageQueue> mqs) 持久化消息进度如代码清单 5-65 所示。
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty()) return;
OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
if (mqs.contains(entry.getKey())) {
AtomicLong offset = entry.getValue();
offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
}
}
String jsonString = offsetSerializeWrapper.toJson(true);
if (jsonString != null) {
try {
MixAll.string2File(jsonString, this.storePath);
} catch (IOException e) {
log.error("persistAll consumer offset Exception, " + this.storePath, e);
}
}
}
持久化消息进度就是将 ConcurrentMap<MessageQueue, AtomicLong> offsetTable 序列化到磁盘文件中。代码不容易理解,我们只需要知道是什么时候持久化消息消费进度的。原来在 MQClientInstance 中会启动一个定时任务,默认每 5s 持久化消息消费进度一次,可通过 persistConsumerOffsetInterval 进行设置,如代码清单 5-66 所示。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
对广播模式的消息消费进度进行存储、更新、持久化还是比较容易的,本书就简单介绍到这里,接下来重点分析集群模式下的消息进度管理。
集群模式消费进度存储
集群模式消息进度存储文件存放在消息服务端。消息消费进度集群模式实现类 org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore 的实现原理如图 5-19 所示。

集群模式下消息消费进度的读取、持久化与广播模式的实现细节差不多,集群模式下如果从内存中读取消费进度,则是从 RemoteBrokerOffsetStore 的 ConcurrentMap<MessageQueue,Atomic-Long> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>() 中根据消息消费队列获取其消息消费进度。如果从磁盘读取,则发送网络请求,请求命令为 QUERY_CONSUMER_OFFSET 。持久化消息进度的请求命令为 UPDATE_CONSUMER_OFFSET,更新 ConsumerOffsetManager 的 ConcurrentMap<String/* topic@group /,ConcurrentMap< Integer/*消息队列ID/, Long/* 消息消费进度 */>> offsetTable,Broker 端默认 10s 持久化一次消息进度,存储文件名为 ${RocketMQ_HOME}/store/config/consumerOffset.json
。存储内容如图 5-20 所示。

消费进度设计思考
广播模式下,消息消费进度的存储与消费组无关,集群模式下则以主题与消费组为键,保存该主题所有队列的消费进度。我们结合并发消息消费的整个流程,思考一下并发消息消费关于消息进度更新的问题,顺序消息消费将在 5.9 节重点讨论。
-
消费者线程池每处理完一个消息消费任务(ConsumeRequest),会从 ProcessQueue 中移除本批消费的消息,并返回 ProcessQueue 中最小的偏移量,用该偏移量更新消息队列消费进度,也就是说更新消费进度与消费任务中的消息没有关系。例如现在有两个消费任务 task1(queueOffset 分别为 20、40)和 task2(queueOffset 分别为 50、70),并且 ProcessQueue 中当前包含最小消息偏移量为 10 的消息,则 task2 消费结束后,将使用 10 更新消费进度,而不是 70。
当 task1 消费结束后,还是以 10 更新消息队列消费进度,消息消费进度的推进取决于 ProcessQueue 中偏移量最小的消息消费速度。如果偏移量为 10 的消息消费成功,且 ProcessQueue 中包含消息偏移量为 100 的消息,则消息偏移量为 10 的消息消费成功后,将直接用 100 更新消息消费进度。如果在消费消息偏移量为 10 的消息时发生了死锁,会导致消息一直无法被消费,岂不是消息进度无法向前推进了?是的,为了避免这种情况,RocketMQ 引入了一种消息拉取流控措施:DefaultMQPushConsumer#consumeConcurrentlyMaxSpan=2000,消息处理队列 ProcessQueue 中最大消息偏移与最小偏移量不能超过该值,如果超过该值,将触发流控,延迟该消息队列的消息拉取。
-
在进行消息负载时,如果消息消费队列被分配给其他消费者,会将该 ProcessQueue 状态设置为 droped,持久化该消息队列的消费进度,并从内存中将其移除。