消费者启动流程
本节介绍消息消费者是如何启动的,请跟我一起来分析DefaultMQPushConsumerImpl的start()方法,如代码清单5-1所示。
代码清单5-1 DefaultMQPushConsumerImpl#copySubscription
private void copySubscription() throws MQClientException {
try {
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(
this.defaultMQPushConsumer.getConsumerGroup(),
topic,
subString
);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(
this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic,
SubscriptionData.SUB_ALL
);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
第一步:构建主题订阅信息SubscriptionData并加入RebalanceImpl的订阅消息中,如代码清单5-2所示。订阅关系来源主要有两个。 1)通过调用DefaultMQPushConsumerImpl#subscribe(String topic, StringsubExpression)方法获取。 2)订阅重试主题消息。RocketMQ消息重试是以消费组为单位,而不是主题,消息重试主题名为%RETRY%+消费组名。消费者在启动时会自动订阅该主题,参与该主题的消息队列负载。
代码清单5-2 DefaultMQPushConsumerImpl#start
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance()
.getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(),
isUnitMode()
);
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
第二步:初始化MQClientInstance、RebalanceImple(消息重新负载实现类)等,如代码清单5-3所示。
代码清单5-3 DefaultMQPushConsumerImpl#start
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(
this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup()
);
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(
this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup()
);
break;
default:
break;
}
}
this.offsetStore.load();
第三步:初始化消息进度。如果消息消费采用集群模式,那么消息进度存储在Broker上,如果采用广播模式,那么消息消费进度存储在消费端,如代码清单5-4所示。具体实现细节后面将重点探讨。
代码清单5-4 DefaultMQPushConsumerImpl#start
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService = new ConsumeMessageOrderlyService(
this,
(MessageListenerOrderly) this.getMessageListenerInner()
);
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService = new ConsumeMessageConcurrentlyService(
this,
(MessageListenerConcurrently) this.getMessageListenerInner()
);
}
this.consumeMessageService.start();
第四步:如果是顺序消费,创建消费端消费线程服务。ConsumeMessageService主要负责消息消费,在内部维护一个线程池,如代码清单5-5所示。
代码清单5-5 DefaultMQPushConsumerImpl#start
boolean registerOK = mQClientFactory.registerConsumer(
this.defaultMQPushConsumer.getConsumerGroup(),
this
);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException(
"The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please."
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null
);
}
mQClientFactory.start();
第五步:向MQClientInstance注册消费者并启动MQClientInstance,JVM中的所有消费者、生产者持有同一个MQClientInstance,MQClientInstance只会启动一次。