生产者启动流程

消息生产者的代码都在 client 模块中,对于 RocketMQ 来说,它既是客户端,也是消息的提供者,我们在应用系统中初始化生产者的一个实例即可使用它来发消息。

初始 DefaultMQProducer

DefaultMQProducer 是默认的消息生产者实现类,实现了 MQAdmin 的接口,其主要接口如图3-6、图3-7所示。

image 2025 01 17 15 29 07 594
Figure 1. 图3-6 MQAdmin接口

下面介绍 DefaultMQProducer 的主要方法,核心属性如代码清单3-2 所示。

  1. void createTopic(String key, String newTopic, int queueNum, int topicSysFlag):创建主题。

    • key:目前无实际作用,可以与 newTopic 相同。

    • newTopic:主题名称。

    • queueNum:队列数量。

    • topicSysFlag:主题系统标签,默认为 0。

    image 2025 01 17 15 32 01 788
    Figure 2. 图3-7 MQProducer接口
  2. long searchOffset(final MessageQueue mq, final long timestamp):根据时间戳从队列中查找其偏移量。

  3. long maxOffset(final MessageQueue mq):查找该消息队列中最大的物理偏移量。

  4. long minOffset(final MessageQueue mq):查找该消息队列中的最小物理偏移量。

  5. MessageExt viewMessage(final String offsetMsgId):根据消息偏移量查找消息。

  6. QueryResult queryMessage(String topic, String key, int maxNum, long begin, longend):根据条件查询消息。

    • topic:消息主题。

    • key:消息索引字段。

    • maxNum:本次最多取出的消息条数。

    • begin:开始时间。

    • end:结束时间。

  7. MessageExt viewMessage(String topic,String msgId):根据主题与消息 ID 查找消息。

  8. List fetchPublishMessageQueues(final String topic):查找该主题下所有的消息队列。

  9. SendResult send(Message msg):同步发送消息,具体发送到主题中的哪个消息队列由负载算法决定。

  10. SendResult send(Message msg, final long timeout):同步发送消息,如果发送超过 timeout 则抛出超时异常。

  11. void send(Message msg, SendCallback sendCallback):异步发送消息,sendCallback 参数是消息发送成功后的回调方法。

  12. void send(Message msg, SendCallback sendCallback, long timeout):异步发送消息,如果发送超过 timeout 则抛出超时异常。

  13. void sendOneway(Message msg):单向消息发送,即不在乎发送结果,消息发送出去后该方法立即返回。

  14. SendResult send(Message msg, MessageQueue mq, final long timeout):同步方式发送消息,且发送到指定的消息队列。

  15. void send(final Message msg, final MessageQueue mq, final SendCallbacksendCallback, long timeout):异步方式发送消息,且发送到指定的消息队列。

  16. void sendOneway(Message msg, MessageQueue Selector selector, Object arg):单向方式发送消息,且发送到指定的消息队列。

  17. SendResult send(final Message msg, final MessageQueueSelector selector, finalObject arg):消息发送,指定消息选择算法,覆盖消息生产者默认的消息队列负载。

  18. SendResult send(final Collection msgs):同步批量消息发送。

代码清单3-2 DefaultMQProducer的核心属性
private String producerGroup;
private String createTopicKey = MixAll.DEFAULT_TOPIC;
private volatile int defaultTopicQueueNums = 4;
private int sendMsgTimeout = 3000;
private int compressMsgBodyOverHowmuch = 1024 * 4;
private int retryTimesWhenSendFailed = 2;
private int retryTimesWhenSendAsyncFailed = 2;
private boolean retryAnotherBrokerWhenNotStoreOK = false;
private int maxMessageSize = 1024 * 1024 * 4; // 4M
  1. producerGroup:生产者所属组,消息服务器在回查事务状态时会随机选择该组中任何一个生产者发起的事务回查请求。

  2. createTopicKey:默认 topicKey。

  3. defaultTopicQueueNums:默认主题在每一个 Broker 队列的数量。

  4. sendMsgTimeout:发送消息的超时时间,默认为 3s。

  5. compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认为 4KB。

  6. retryTimesWhenSendFailed:同步方式发送消息重试次数,默认为 2,总共执行 3 次。

  7. retryTimesWhenSendAsyncFailed:异步方式发送消息的重试次数,默认为2。

  8. retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个 Broker,是否不等待存储结果就返回,默认为 false。

  9. maxMessageSize:允许发送的最大消息长度,默认为 4MB,最大值为 232-1。

消息生产者启动流程

消息生产者是如何一步一步启动的呢?我们可以从 DefaultMQProducerImpl 的 start 方法来跟踪,具体细节如代码清单3-3 所示。

代码清单3-3 DefaultMQProducerImpl#start
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
    this.defaultMQProducer.changeInstanceNameToPID();
}

第一步:检查 producerGroup 是否符合要求,改变生产者的 instanceName 为进程 ID,如代码清单3-4和代码清单3-5所示。

代码清单3-4 DefaultMQProducerImpl#start
this.mQClientFactory = MQClientManager.getInstance().
        getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
代码清单3-5 MQClientManager#getAndCreateMQClientInstance
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    String clientId = clientConfig.buildMQClientId();
    MQClientInstance instance = this.factoryTable.get(clientId);

    if (null == instance) {
        instance = new MQClientInstance(clientConfig.cloneClientConfig(),
            this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);

        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }

    return instance;
}

第二步:创建 MQClientInstance 实例。整个 JVM 实例中只存在一个 MQClientManager 实例,维护一个 MQClientInstance 缓存表 ConcurrentMap<String/* clientId */,MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>(),即同一个 clientId 只会创建一个 MQClientInstance 实例。创建 clientId 的方法如代码清单3-6 所示。

代码清单3-6 ClientConfig#buildMQClientId
public String buildMQClientId() {
    StringBuilder sb = new StringBuilder();
    sb.append(this.getClientIP());
    sb.append("@");
    sb.append(this.getInstanceName());

    if (!UtilAll.isBlank(this.unitName)) {
        sb.append("@");
        sb.append(this.unitName);
    }

    return sb.toString();
}

clientId 为客户端 IP+instance+unitname(可选),如果在同一台物理服务器部署两个应用程序,应用程序的 clientId 岂不是相同,这样是不是会造成混乱?

为了避免出现这个问题,如果 instance 为默认值 DEFAULT,RocketMQ 会自动将 instance 设置为进程ID,这样就避免了不同进程相互影响,但同一个 JVM 中相同 clientId 的消费者和生产者在启动时获取的 MQClientInstane 实例都是同一个,如代码清单3-7 所示。MQClientInstance 封装了 RocketMQ 的网络处理 API,是消息生产者、消息消费者与 NameServer、Broker 打交道的网络通道。

代码清单3-7 DefaultMQProducerImpl#start
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

if (!registerOK) {
    this.serviceState = ServiceState.CREATE_JUST;
    throw new MQClientException(
        "The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." +
        FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null
    );
}

第三步:向 MQClientInstance 注册服务,将当前生产者加入 MQClientInstance 管理,方便后续调用网络请求、进行心跳检测等。

第四步:启动 MQClientInstance,如果 MQClientInstance 已经启动,则本次启动不会真正执行。MQClientInstance 启动过程将在第 5 章讲解消息消费时详细介绍。