批量消息发送

批量消息发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。当然,并不是在同一批次中发送的消息数量越多,性能就越好,判断依据是单条消息的长度,如果单条消息内容比较长,则打包发送多条消息会影响其他线程发送消息的响应时间,并且单批次消息发送总长度不能超过Default MQProducer#maxMessageSize。批量发送消息要解决的是如何将这些消息编码,以便服务端能够正确解码每条消息的内容。

那么RocketMQ如何编码多条消息呢?我们首先梳理一下RocketMQ网络请求命令的设计,其类图如图3-11所示。下面我们逐一介绍RemotingCommand的属性。

image 2025 01 17 17 06 40 141
Figure 1. 图3-11 RocketMQ请求命令类图
  1. code:请求命令编码,请求命令类型。

  2. version:版本号。

  3. opaque:客户端请求序号。

  4. flag:标记。倒数第一位表示请求类型,0表示请求;1表示返回。倒数第二位,1表示单向发送。

  5. remark:描述。

  6. extFields:扩展属性。

  7. customeHeader:每个请求对应的请求头信息。

  8. byte[] body:消息体内容。

发送单条消息时,消息体的内容将保存在body中。发送批量消息时,需要将多条消息体的内容存储在body中。如何存储更便于服务端正确解析每条消息呢?RocketMQ采取的方式是,对单条消息内容使用固定格式进行存储,如图3-12所示。

image 2025 01 17 17 08 12 533
Figure 2. 图3-12 RocetMQ消息封装格式

接下来梳理一下批量消息发送的核心流程,如代码清单3-29所示。

代码清单3-29 DefaultMQProducer#send消息批量发送
public SendResult send(Collection<Message> msgs) throws MQClientException,
        RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs));
}

首先在消息发送端,调用batch()方法,将一批消息封装成MessageBatch对象。Message-Batch继承自Message对象,内部持有List<Message> messages。这样一来,批量消息发送与单条消息发送的处理流程就完全一样了。MessageBatch只需要将该集合中每条消息的消息体聚合成一个byte[]数组,在消息服务端能够从该byte[]数组中正确解析出消息,如代码清单3-30所示。

代码清单3-30 Message’Batch#encode
public byte[] encode() {
    return MessageDecoder.encodeMessages(messages);
}

在创建RemotingCommand对象时,调用messageBatch#encode方法填充到RemotingCommand的body域中。多条消息编码格式可参考图3-12,如代码清单3-31所示。

代码清单3-31 MessageDecoder#encodeMessage
public static byte[] encodeMessage(Message message) {
    byte[] body = message.getBody();
    int bodyLen = body.length;
    String properties = messageProperties2String(message.getProperties());
    byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
    short propertiesLength = (short) propertiesBytes.length;
    int sysFlag = message.getFlag();

    int storeSize = 4  // 1 TOTALSIZE
                   + 4  // 2 MAGICCODE
                   + 4  // 3 BODYCRC
                   + 4  // 4 FLAG
                   + 4 + bodyLen  // 5 BODY
                   + 2 + propertiesLength;  // 6 properties

    ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);

    // 1 TOTALSIZE
    byteBuffer.putInt(storeSize);

    // 2 MAGICCODE
    byteBuffer.putInt(0);

    // 3 BODYCRC
    byteBuffer.putInt(0);

    // 4 FLAG
    byteBuffer.putInt(sysFlag);

    // 5 BODY
    byteBuffer.putInt(bodyLen);
    byteBuffer.put(body);

    // 6 properties
    byteBuffer.putShort(propertiesLength);
    byteBuffer.put(propertiesBytes);

    return byteBuffer.array();
}

在消息发送端将会按照上述结构进行解码,整个发送流程与单个消息发送没有差异,就不一一介绍了。