消息消费者初探

下面我们介绍推模式消费者MQPushConsume的主要API,如图5-4所示。

image 2025 01 18 14 52 46 686
Figure 1. 图5-4 MQPushConsumer类图

下面介绍MQPushConsumer的核心属性。

1)void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)消息消费失败,将消息重新发送到Broker服务器。

  • msg:消息。

  • delayLevel:消息延迟级别。

  • broderName:消息服务器名称。

2)Set fetchSubscribeMessageQueues(String topic):获取消费者对topic分配了哪些消息队列。

  • topic:主题名称。

3)void registerMessageListener(MessageListenerConcurrently messageListener):注册并发消息事件监听器。

4)void registerMessageListener(MessageListenerOrderly messageListener):注册顺序消费事件监听器。

5)void subscribe(String topic, String subExpression):基于主题订阅消息。

  • topic:消息主题。

  • subExpression:消息过滤表达式,TAG或SQL92表达式。

6)void subscribe(String topic, String fullClassName, String filterClassSource):基于主题订阅消息,消息过滤方式使用类模式。

  • topic:消息主题。

  • fullClassName:过滤类全路径名。

  • filterClassSource:过滤类代码。

7)void unsubscribe(final String topic):取消消息订阅。

推模式消息消费者DefaultMQPushConsumer主要属性如图5-5所示。

image 2025 01 18 14 54 16 252
Figure 2. 图5-5 DefaultMQPushConsumer类图

1)consumerGroup:消费者所属组。 2)messageModel:消息消费模式,分为集群模式、广播模式,默认为集群模式。 3)ConsumeFromWhere consumeFromWhere:第一次消费时指定消费策略。

  • CONSUME_FROM_LAST_OFFSET:此处分为两种情况,如果磁盘消息未过期且未被删除,则从最小偏移量开始消费。如果磁盘已过期并被删除,则从最大偏移量开始消费。

  • CONSUME_FROM_FIRST_OFFSET:从队列当前最小偏移量开始消费。

  • CONSUME_FROM_TIMESTAMP:从消费者指定时间戳开始消费。

注意:如果从消息进度服务OffsetStore读取到MessageQueue中的偏移量不小于0,则使用读取到的偏移量拉取消息,只有在读到的偏移量小于0时,上述策略才会生效。

4)allocateMessageQueueStrategy:集群模式下消息队列的负载策略。 5)Map subscription:订阅信息。 6)MessageListener messageListener:消息业务监听器。 7)OffsetStore offsetStore:消息消费进度存储器。 8)int consumeThreadMin = 20:消费者最小线程数。 9)int consumeThreadMax = 64:消费者最大线程数,因为消费者线程池使用无界队列,所以此参数不生效。 10)consumeConcurrentlyMaxSpan=2000:并发消息消费时处理队列最大跨度,默认2000,表示如果消息处理队列中偏移量最大的消息与偏移量最小的消息的跨度超过2000,则延迟50ms后再拉取消息。 11)int pullThresholdForQueue=1000:默认1000,表示每1000次流控后打印流控日志。 12)long pullInterval = 0:推模式下拉取任务的间隔时间,默认一次拉取任务完成后继续拉取。 13)int pullBatchSize=32:每次消息拉取的条数,默认32条。 14)int consumeMessageBatchMaxSize=1:消息并发消费时一次消费消息的条数,通俗点说,就是每次传入MessageListener#consumeMessage中的消息条数。 15)postSubscriptionWhenPull=false:是否每次拉取消息都更新订阅信息,默认为false。 16)maxReconsumeTimes=-1:最大消费重试次数。如果消息消费次数超过maxReconsume Times还未成功,则将该消息转移到一个失败队列,等待被删除。 17)suspendCurrentQueueTimeMillis=1000:延迟将该队列的消息提交到消费者线程的等待时间,默认延迟1s。 18)long consumeTimeout=15:消息消费超时时间,默认为15,单位为分钟。

ConsumeFromWhere只是在没有位点时决定从何处消费端策略,并且策略模式为CONSUME_FROM_LAST_OFFSET,在当偏移量为0的CommiTlog文件存在时,会从最小位点开始消费,详情可以参考笔者的公众号文章: https://mp.weixin.qq.com/s/N_ttVjBpqVUA0CGrOybNLA