消费者启动流程

本节介绍消息消费者是如何启动的,请跟我一起来分析DefaultMQPushConsumerImpl的start()方法,如代码清单5-1所示。

代码清单5-1 DefaultMQPushConsumerImpl#copySubscription
private void copySubscription() throws MQClientException {
    try {
        Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
        if (sub != null) {
            for (final Map.Entry<String, String> entry : sub.entrySet()) {
                final String topic = entry.getKey();
                final String subString = entry.getValue();
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(
                        this.defaultMQPushConsumer.getConsumerGroup(),
                        topic,
                        subString
                );
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            }
        }
        if (null == this.messageListenerInner) {
            this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
        }
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                break;
            case CLUSTERING:
                final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(
                        this.defaultMQPushConsumer.getConsumerGroup(),
                        retryTopic,
                        SubscriptionData.SUB_ALL
                );
                this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                break;
            default:
                break;
        }
    } catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}

第一步:构建主题订阅信息SubscriptionData并加入RebalanceImpl的订阅消息中,如代码清单5-2所示。订阅关系来源主要有两个。 1)通过调用DefaultMQPushConsumerImpl#subscribe(String topic, StringsubExpression)方法获取。 2)订阅重试主题消息。RocketMQ消息重试是以消费组为单位,而不是主题,消息重试主题名为%RETRY%+消费组名。消费者在启动时会自动订阅该主题,参与该主题的消息队列负载。

代码清单5-2 DefaultMQPushConsumerImpl#start
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
    this.defaultMQPushConsumer.changeInstanceNameToPID();
}

this.mQClientFactory = MQClientManager.getInstance()
        .getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

this.pullAPIWrapper = new PullAPIWrapper(
        mQClientFactory,
        this.defaultMQPushConsumer.getConsumerGroup(),
        isUnitMode()
);

this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

第二步:初始化MQClientInstance、RebalanceImple(消息重新负载实现类)等,如代码清单5-3所示。

代码清单5-3 DefaultMQPushConsumerImpl#start
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            this.offsetStore = new LocalFileOffsetStore(
                    this.mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup()
            );
            break;
        case CLUSTERING:
            this.offsetStore = new RemoteBrokerOffsetStore(
                    this.mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup()
            );
            break;
        default:
            break;
    }
}

this.offsetStore.load();

第三步:初始化消息进度。如果消息消费采用集群模式,那么消息进度存储在Broker上,如果采用广播模式,那么消息消费进度存储在消费端,如代码清单5-4所示。具体实现细节后面将重点探讨。

代码清单5-4 DefaultMQPushConsumerImpl#start
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService = new ConsumeMessageOrderlyService(
            this,
            (MessageListenerOrderly) this.getMessageListenerInner()
    );
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    this.consumeOrderly = false;
    this.consumeMessageService = new ConsumeMessageConcurrentlyService(
            this,
            (MessageListenerConcurrently) this.getMessageListenerInner()
    );
}

this.consumeMessageService.start();

第四步:如果是顺序消费,创建消费端消费线程服务。ConsumeMessageService主要负责消息消费,在内部维护一个线程池,如代码清单5-5所示。

代码清单5-5 DefaultMQPushConsumerImpl#start
boolean registerOK = mQClientFactory.registerConsumer(
        this.defaultMQPushConsumer.getConsumerGroup(),
        this
);

if (!registerOK) {
    this.serviceState = ServiceState.CREATE_JUST;
    this.consumeMessageService.shutdown();
    throw new MQClientException(
            "The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
            + "] has been created before, specify another name please."
            + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
            null
    );
}

mQClientFactory.start();

第五步:向MQClientInstance注册消费者并启动MQClientInstance,JVM中的所有消费者、生产者持有同一个MQClientInstance,MQClientInstance只会启动一次。