消息发送存储流程

本节将以消息发送存储为突破点,层层揭开RocketMQ存储设计的神秘面纱。消息存储入口为org.apache.rocketmq.store.DefaultMessageStore#putMessage。

第一步:如果当前broker停止工作或当前不支持写入,则拒绝消息写入。如果消息主题长度超过127个字符、消息属性长度超过32767个字符,同样拒绝该消息写入。如果日志中出现“message store is not writeable, so putMessage is forbidden”提示,最有可能是因为磁盘空间不足,在写入ConsumeQueue、Index文件出现错误时会拒绝消息再次写入。

第二步:如果消息的延迟级别大于0,将消息的原主题名称与原消息队列ID存入消息属性中,用延迟消息主题SCHEDULE_TOPIC_XXXX、消息队列ID更新原先消息的主题与队列,如代码清单4-1所示。这是并发消息消费重试关键的一步,第5章会重点探讨消息重试机制与定时消息的实现原理。

代码清单4-1 CommitLog#putMessage
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

第三步:获取当前可以写入的CommitLog文件,RocketMQ物理文件的组织方式如图4-8所示。

image 2025 01 18 12 18 29 567
Figure 1. 图4-8 CommitLog文件组织方式

CommitLog文件的存储目录为${ROCKET_HOME}/store/commitlog,每个文件默认1GB,一个文件写满后再创建另一个,以该文件中第一个偏移量为文件名,如果偏移量少于20位则用0补齐。图4-8所示的第一个文件初始偏移量为0,第二个文件名中的“1073741824”代表该文件第一条消息的物理偏移量为1073741824,这样根据物理偏移量可以快速定位到消息。MappedFileQueue可以看作${ROCKET_HOME}/store/commitlog文件夹,而MappedFile则对应该文件夹下的文件。

第四步:在将消息写入CommitLog之前,先申请putMessageLock,如代码清单4-2所示。

代码清单4-2 CommitLog#putMessage
messageExtBatch.setStoreTimestamp(beginLockTimestamp);

if (null == mappedFile || mappedFile.isFull()) {
    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
}

if (null == mappedFile) {
    log.error("Create mapped file1 error, topic: {} clientAddr: {}",
              messageExtBatch.getTopic(),
              messageExtBatch.getBornHostString());
    beginTimeInLock = 0;
    return new PutMessageResult(PutMessageStatus.CREATE_MAPPEDFILE_FAILED, null);
}

第五步:设置消息的存储时间,如果mappedFile为空,表明${ROCKET_HOME}/store/commitlog目录下不存在任何文件,说明本次消息是第一次发送,用偏移量0创建第一个CommitLog文件,文件名为00000000000000000000,如果文件创建失败,抛出CREATE_MAPEDFILE_FAILED,这很有可能是磁盘空间不足或权限不够导致的,如代码清单4-3所示。

代码清单4-3 MappedFile#appendMessagesInner
int currentPos = this.wrotePosition.get();

if (currentPos < this.fileSize) {
    ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
    byteBuffer.position(currentPos);
    AppendMessageResult result = null;

    if (messageExt instanceof MessageExtBrokerInner) {
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
    } else if (messageExt instanceof MessageExtBatch) {
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
    } else {
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }

    this.wrotePosition.addAndGet(result.getWroteBytes());
    this.storeTimestamp = result.getStoreTimestamp();

    return result;
}

第六步:将消息追加到MappedFile中。首先获取MappedFile当前的写指针,如果currentPos大于或等于文件大小,表明文件已写满,抛出AppendMessageStatus.UNKNOWN_ERROR。如果currentPos小于文件大小,通过slice()方法创建一个与原ByteBuffer共享的内存区,且拥有独立的position、limit、capacity等指针,并设置position为当前指针,如代码清单4-4所示。

代码清单4-4 CommitLog$DefaultAppendMessageCallback#doAppend
long wroteOffset = fileFromOffset + byteBuffer.position();
this.resetByteBuffer(hostHolder, 8);

String msgId = MessageDecoder.createMessageId(
    this.msgIdMemory,
    msgInner.getStoreHostBytes(hostHolder),
    wroteOffset
);

第七步:创建全局唯一消息ID,消息ID有16字节,其组成结构如图4-9所示。

image 2025 01 18 12 23 32 406
Figure 2. 图4-9 消息ID组成结构

为了消息ID具备可读性,返回给应用程序的msgId为字符类型,可以通过UtilAll.bytes2string方法将msgId字节数组转换成字符串,通过UtilAll.string2bytes方法将msgId字符串还原成16字节的数组,根据提取的消息物理偏移量,可以快速通过msgId找到消息内容,如代码清单4-5所示。

代码清单4-5 CommitLog$DefaultAppendMessageCallback#doAppend
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());

String key = keyBuilder.toString();

Long queueOffset = CommitLog.this.topicQueueTable.get(key);

if (null == queueOffset) {
    queueOffset = 0L;
    CommitLog.this.topicQueueTable.put(key, queueOffset);
}

第八步:获取该消息在消息队列的物理偏移量。CommitLog中保存了当前所有消息队列的待写入物理偏移量,如代码清单4-6所示。

代码清单4-6 CommitLog#calMsgLength
private static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
    final int msgLen = 4                // TOTALSIZE
                     + 4                // MAGICCODE
                     + 4                // BODYCRC
                     + 4                // QUEUEID
                     + 4                // FLAG
                     + 8                // QUEUEOFFSET
                     + 8                // PHYSICALOFFSET
                     + 4                // SYSFLAG
                     + 8                // BORNTIMESTAMP
                     + 8                // BORNHOST
                     + 8                // STORETIMESTAMP
                     + 8                // STOREHOSTADDRESS
                     + 4                // RECONSUMETIMES
                     + 8                // Prepared Transaction Offset
                     + 4
                     + (bodyLength > 0 ? bodyLength : 0)  // BODY
                     + 1
                     + topicLength     // TOPIC
                     + 2
                     + (propertiesLength > 0 ? propertiesLength : 0);  // propertiesLength

    return msgLen;
}

第九步:根据消息体、主题和属性的长度,结合消息存储格式,计算消息的总长度,如代码清单4-7所示。RocketMQ消息存储格式如下。 1)TOTALSIZE:消息条目总长度,4字节。 2)MAGICCODE:魔数,4字节。固定值0xdaa320a7。 3)BODYCRC:消息体的crc校验码,4字节。 4)QUEUEID:消息消费队列ID,4字节。 5)FLAG:消息标记,RocketMQ对其不做处理,供应用程序使用,默认4字节。 6)QUEUEOFFSET:消息在ConsumeQuene文件中的物理偏移量,8字节。 7)PHYSICALOFFSET:消息在CommitLog文件中的物理偏移量,8字节。 8)SYSFLAG:消息系统标记,例如是否压缩、是否是事务消息等,4字节。 9)BORNTIMESTAMP:消息生产者调用消息发送API的时间戳,8字节。 10)BORNHOST:消息发送者IP、端口号,8字节。 11)STORETIMESTAMP:消息存储时间戳,8字节。 12)STOREHOSTADDRESS:Broker服务器IP+端口号,8字节。 13)RECONSUMETIMES:消息重试次数,4字节。 14)Prepared Transaction Offset:事务消息的物理偏移量,8字节。 15)BodyLength:消息体长度,4字节。 16)Body:消息体内容,长度为bodyLenth中存储的值。 17)TopicLength:主题存储长度,1字节,表示主题名称不能超过255个字符。 18)Topic:主题,长度为TopicLength中存储的值。 19)PropertiesLength:消息属性长度,2字节,表示消息属性长度不能超过65536个字符。 20)Properties:消息属性,长度为PropertiesLength中存储的值。 CommitLog条目是不定长的,每一个条目的长度存储在前4个字节中。

代码清单4-7 CommitLog$DefaultAppendMessageCallback#doAppend
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
    this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
    this.msgStoreItemMemory.putInt(maxBlank);
    this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);

    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();

    byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);

    return new AppendMessageResult(
        AppendMessageStatus.END_OF_FILE,
        wroteOffset,
        maxBlank,
        msgId,
        msgInner.getStoreTimestamp(),
        queueOffset,
        CommitLog.this.defaultMessageStore.now() - beginTimeMills
    );
}

第十步:如果消息长度+END_FILE_MIN_BLANK_LENGTH大于CommitLog文件的空闲空间,则返回AppendMessageStatus.END_OF_FILE,Broker会创建一个新的CommitLog文件来存储该消息。从这里可以看出,每个CommitLog文件最少空闲8字节,高4字节存储当前文件的剩余空间,低4字节存储魔数CommitLog.BLANK_MAGIC_CODE,如代码清单4-8所示。

代码清单4-8 CommitLog$DefaultAppendMessageCallback#doAppend
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

AppendMessageResult result = new AppendMessageResult(
    AppendMessageStatus.PUT_OK,
    wroteOffset,
    msgLen,
    msgId,
    msgInner.getStoreTimestamp(),
    queueOffset,
    CommitLog.this.defaultMessageStore.now() - beginTimeMills
);

第十一步:将消息内容存储到ByteBuffer中,然后创建AppendMessageResult。这里只是将消息存储在MappedFile对应的内存映射Buffer中,并没有写入磁盘,追加结果如图4-10所示。

image 2025 01 18 12 29 21 853
Figure 3. 图4-10 AppendMessageResult类图

下面逐一介绍AppendMessageResult的属性。 1)AppendMessageStatus status:消息追加结果,取值为PUT_OK则代表追加成功、END_OF_FILE则代表超过文件大小、MESSAGE_SIZE_EXCEEDED则代表消息长度超过最大允许长度、PROPERTIES_SIZE_EXCEEDED则代表消息属性超过最大允许长度、UNKNOWN_ERROR则代表未知异常。 2)long wroteOffset:消息的物理偏移量。 3)String msgId:消息ID。 4)long storeTimestamp:消息存储时间戳。 5)long logicsOffset:消息消费队列的逻辑偏移量,类似于数组下标。 6)long pagecacheRT = 0:写入页缓存的响应时间。 7)int msgNum = 1:批量发送消息时的消息条数。

代码清单4-9 CommitLog$DefaultAppendMessageCallback#doAppend
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
    CommitLog.this.topicQueueTable.put(key, ++queueOffset);
    break;

第十二步:更新消息队列的逻辑偏移量。 第十三步:处理完消息追加逻辑后将释放putMessageLock,如代码清单4-10所示。

代码清单4-10 CommitLog#putMessage
handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg);
return putMessageResult;

第十四步:DefaultAppendMessageCallback#doAppend只是将消息追加到内存中,需要根据采取的是同步刷盘方式还是异步刷盘方式,将内存中的数据持久化到磁盘中,4.8节会详细介绍刷盘操作。然后执行HA主从同步复制,主从同步将在第7章详细介绍。

消息发送的基本流程就介绍到这里,4.4节开始会详细剖析RocketMQ消息存储机制的各个方面。