存储文件组织与内存映射
RocketMQ通过使用内存映射文件来提高I/O访问性能,无论是CommitLog、Consume-Queue还是Index,单个文件都被设计为固定长度,一个文件写满以后再创建新文件,文件名就为该文件第一条消息对应的全局物理偏移量。
RocketMQ使用MappedFile、MappedFileQueue来封装存储文件。
MappedFileQueue映射文件队列
MappedFileQueue是MappedFile的管理容器,MappedFileQueue对存储目录进行封装,例如CommitLog文件的存储路径为${ROCKET_HOME}/store/commitlog/,该目录下会存在多个内存映射文件MappedFile。MappedFileQueue类图如图4-11所示。

下面介绍MappedFileQueue的核心属性。
1)String storePath:存储目录。 2)int mappedFileSize:单个文件的存储大小。 3)CopyOnWriteArrayList mappedFiles:MappedFile集合。 4)AllocateMappedFileService allocateMappedFileService:创建MappedFile服务类。 5)long flushedWhere = 0:当前刷盘指针,表示该指针之前的所有数据全部持久化到磁盘。 6)long committedWhere = 0:当前数据提交指针,内存中ByteBuffer当前的写指针,该值大于、等于flushedWhere。
接下来重点分析根据不同维度查找MappedFile的方法,如代码清单4-11所示。
public MappedFile getMappedFileByTime(final long timestamp) {
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs) {
return null;
}
for (int i = 0; i < mfs.length; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
return mappedFile;
}
}
return (MappedFile) mfs[mfs.length - 1];
}
根据消息存储时间戳查找MappdFile。从MappedFile列表中第一个文件开始查找,找到第一个最后一次更新时间大于待查找时间戳的文件,如果不存在,则返回最后一个MappedFile,如代码清单4-12所示。
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
// 省略外层 try ... catch
MappedFile mappedFile = this.getFirstMappedFile();
if (mappedFile != null) {
int index = (int) ((offset / this.mappedFileSize) -
(mappedFile.getFileFromOffset() / this.mappedFileSize));
if (index < 0 || index >= this.mappedFiles.size()) {
// 省略警告日志
}
try {
return this.mappedFiles.get(index);
} catch (Exception e) {
if (returnFirstOnNotFound) {
return mappedFile;
}
LOG_ERROR.warn("findMappedFileByOffset failure. ", e);
}
}
return null; // 若未找到文件时,返回 null 或根据需要调整
}
根据消息偏移量offset查找MappedFile,但是不能直接使用offset%mappedFileSize。这是因为使用了内存映射,只要是存在于存储目录下的文件,都需要对应创建内存映射文件,如果不定时将已消费的消息从存储文件中删除,会造成极大的内存压力与资源浪费,所以RocketMQ采取定时删除存储文件的策略。也就是说,在存储文件中,第一个文件不一定是00000000000000000000,因为该文件在某一时刻会被删除,所以根据offset定位MappedFile的算法为(int)offset/this.mappedFileSize)(mappedFile.getFileFromOffset()/this.MappedFileSize,如代码清单4-13所示。
public long getMinOffset() {
if (!this.mappedFiles.isEmpty()) {
try {
return this.mappedFiles.get(0).getFileFromOffset();
} catch (IndexOutOfBoundsException e) {
// 处理 IndexOutOfBoundsException 异常
} catch (Exception e) {
log.error("getMinOffset has exception.", e);
}
}
return -1;
}
获取存储文件最小偏移量。从这里也可以看出,并不是直接返回0,而是返回MappedFile的getFileFormOffset()方法,如代码清单4-14所示。
public long getMaxOffset() {
MappedFile mappedFile = getLastMappedFile();
if (mappedFile != null) {
return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
}
return 0;
}
获取存储文件的最大偏移量。返回最后一个MappedFile的fileFromOffset,加上MappedFile当前的写指针,如代码清单4-15所示。
public long getMaxWrotePosition() {
MappedFile mappedFile = getLastMappedFile();
if (mappedFile != null) {
return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
}
return 0;
}
返回存储文件当前的写指针。返回最后一个文件的fileFromOffset,加上当前写指针位置。
关于MappedFileQueue的相关业务方法,我们在具体使用到时再去剖析。
MappedFile内存映射文件
MappedFile是RocketMQ内存映射文件的具体实现,如图4-12所示。

下面介绍MappedFile的核心属性。 1)int OS_PAGE_SIZE:操作系统每页大小,默认4KB。 2)AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY:当前JVM实例中MappedFile的虚拟内存。 3)AtomicInteger TOTAL_MAPPED_FILES:当前JVM实例中MappedFile对象个数。 4)AtomicInteger wrotePosition:当前文件的写指针,从0开始(内存映射文件中的写指针)。 5)AtomicInteger committedPosition:当前文件的提交指针,如果开启transientStore-PoolEnable,则数据会存储在TransientStorePool中,然后提交到内存映射ByteBuffer中,再写入磁盘。 6)AtomicInteger flushedPosition:将该指针之前的数据持久化存储到磁盘中。 7)int fileSize:文件大小。 8)FileChannel fileChannel:文件通道。 9)ByteBuffer writeBuffer:堆外内存ByteBuffer,如果不为空,数据首先将存储在该Buffer中,然后提交到MappedFile创建的FileChannel中。transientStorePoolEnable为true时不为空。 10)TransientStorePool transientStorePool:堆外内存池,该内存池中的内存会提供内存锁机制。transientStorePoolEnable为true时启用。 11)String fileName:文件名称。 12)long fileFromOffset:该文件的初始偏移量。 13)File file:物理文件。 14)MappedByteBuffer mappedByteBuffer:物理文件对应的内存映射Buffer。 15)volatile long storeTimestamp = 0:文件最后一次写入内容的时间。 16)boolean firstCreateInQueue:是否是MappedFileQueue队列中第一个文件。
-
MappedFile初始化
第一步:根据是否开启transientStorePoolEnable存在两种初始化情况。transientStorePool-Enable为true表示内容先存储在堆外内存,然后通过Commit线程将数据提交到FileChannel中,再通过Flush线程将数据持久化到磁盘中,如代码清单4-16所示。
代码清单4-16 MappedFile#init(final String fileName, final int fileSize)this.fileName = fileName; this.fileSize = fileSize; this.file = new File(fileName); this.fileFromOffset = Long.parseLong(this.file.getName()); ensureDirOK(this.file.getParent()); this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); TOTAL_MAPPED_FILES.incrementAndGet();
第二步:初始化fileFromOffset为文件名,也就是文件名代表该文件的起始偏移量,通过RandomAccessFile创建读写文件通道,并将文件内容使用NIO的内存映射Buffer将文件映射到内存中,如代码清单4-17所示。
代码清单4-16 MappedFile#init(final String fileName, final int fileSize)this.fileName = fileName; this.fileSize = fileSize; this.file = new File(fileName); this.fileFromOffset = Long.parseLong(this.file.getName()); ensureDirOK(this.file.getParent()); this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); TOTAL_MAPPED_FILES.incrementAndGet();
第二步:初始化fileFromOffset为文件名,也就是文件名代表该文件的起始偏移量,通过RandomAccessFile创建读写文件通道,并将文件内容使用NIO的内存映射Buffer将文件映射到内存中,如代码清单4-17所示。
代码清单4-17 MappedFile#initpublic void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException { init(fileName, fileSize); this.writeBuffer = transientStorePool.borrowBuffer(); this.transientStorePool = transientStorePool; }
如果transientStorePoolEnable为true,则初始化MappedFile的writeBuffer,该buffer从transientStorePool中获取。
-
MappedFile提交
内存映射文件的提交动作由MappedFile的commit()方法实现,如代码清单4-18所示。
代码清单4-18 MappedFile#commitpublic int commit(final int commitLeastPages) { if (writeBuffer == null) { return this.wrotePosition.get(); } if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { commit0(commitLeastPages); this.release(); } else { log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); } } if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { this.transientStorePool.returnBuffer(writeBuffer); this.writeBuffer = null; } return this.committedPosition.get(); }
执行提交操作,commitLeastPages为本次提交的最小页数,如果待提交数据不满足commitLeastPages,则不执行本次提交操作,等待下次提交。writeBuffer如果为空,直接返回wrotePosition指针,无须执行commit操作,这表明commit操作的主体是writeBuffer,如代码清单4-19所示。
代码清单4-19 MappedFile#isAbleToCommitprotected boolean isAbleToCommit(final int commitLeastPages) { int flush = this.committedPosition.get(); int write = this.wrotePosition.get(); if (this.isFull()) { return true; } if (commitLeastPages > 0) { return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages; } return write > flush; }
判断是否执行commit操作。如果文件已满,返回true。如果commitLeastPages大于0,则计算wrotePosition(当前writeBuffe的写指针)与上一次提交的指针(committedPosition)的差值,将其除以OS_PAGE_SIZE得到当前脏页的数量,如果大于commitLeastPages,则返回true。如果commitLeastPages小于0,表示只要存在脏页就提交,如代码清单4-20所示。
代码清单4-20 MappedFile#commit0protected void commit0(final int commitLeastPages) { int writePos = this.wrotePosition.get(); int lastCommittedPosition = this.committedPosition.get(); if (writePos - this.committedPosition.get() > 0) { try { ByteBuffer byteBuffer = writeBuffer.slice(); byteBuffer.position(lastCommittedPosition); byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); this.fileChannel.write(byteBuffer); this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } }
下面介绍具体的MappedFile提交实现过程。首先创建writeBuffer的共享缓存区,然后将新创建的position回退到上一次提交的位置(committedPosition),设置limit为wrotePosition(当前最大有效数据指针),接着把committedPosition到wrotePosition的数据复制(写入)到FileChannel中,最后更新committedPosition指针为wrotePosition。commit的作用是将MappedFile# writeBuffer中的数据提交到文件通道FileChannel中。 ByteBuffer使用技巧:调用slice()方法创建一个共享缓存区,与原先的ByteBuffer共享内存并维护一套独立的指针(position、mark、limit)。
-
MappedFile刷盘
刷盘指的是将内存中的数据写入磁盘,永久存储在磁盘中,由MappedFile的flush()方法实现,如代码清单4-21所示。
代码清单4-21 MappedFile#flushpublic int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); try { if (writeBuffer != null || this.fileChannel.position() != 0) { this.fileChannel.force(false); } else { this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } this.flushedPosition.set(value); this.release(); } else { this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); }
直接调用mappedByteBuffer或fileChannel的force()方法将数据写入磁盘,将内存中的数据持久化到磁盘中,那么flushedPosition应该等于MappedByteBuffer中的写指针。如果writeBuffer不为空,则flushedPosition应等于上一次commit指针。因为上一次提交的数据就是进入MappedByteBuffer中的数据。如果writeBuffer为空,表示数据是直接进入MappedByteBuffer的,wrotePosition代表的是MappedByteBuffer中的指针,故设置flushedPosition为wrotePosition。
-
获取MappedFile最大读指针
RocketMQ文件的一个组织方式是内存映射,预先申请一块连续且固定大小的内存,需要一套指针标识当前最大有效数据的位置,获取最大有效数据偏移量的方法由MappedFile的getReadPosition()方法实现,如代码清单4-22所示。
代码清单4-22 MappedFile#getReadPositionpublic int getReadPosition() { return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get(); }
获取当前文件最大的可读指针,如代码清单4-23所示。如果writeBuffer为空,则直接返回当前的写指针。如果writeBuffer不为空,则返回上一次提交的指针。在MappedFile设计中,只有提交了的数据(写入MappedByteBuffer或FileChannel中的数据)才是安全的数据。
代码清单4-23 MappedFile#selectMappedBufferpublic SelectMappedBufferResult selectMappedBuffer(int pos) { int readPosition = getReadPosition(); if (pos < readPosition && pos >= 0) { if (this.hold()) { ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); byteBuffer.position(pos); int size = readPosition - pos; ByteBuffer byteBufferNew = byteBuffer.slice(); byteBufferNew.limit(size); return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); } } return null; }
首先查找pos到当前最大可读指针之间的数据,因为在整个写入期间都未曾改变MappedByteBuffer的指针,所以mappedByteBuffer.slice()方法返回的共享缓存区空间为整个MappedFile。然后通过设置byteBuffer的position为待查找的值,读取字节为当前可读字节长度,最终返回的ByteBuffer的limit(可读最大长度)为size。整个共享缓存区的容量为MappedFile#fileSizepos,故在操作SelectMappedBufferResult时不能对包含在里面的ByteBuffer调用flip()方法。
操作ByteBuffer时如果使用了slice()方法,对其ByteBuffer进行读取时一般手动指定position和limit指针,而不是调用flip()方法切换读写状态。
-
MappedFile销毁
MappedFile文件销毁的实现方法为public boolean destroy(final long intervalForcibly),intervalForcibly表示拒绝被销毁的最大存活时间,如代码清单4-24所示。
public void shutdown(final long intervalForcibly) {
if (this.available) {
this.available = false;
this.firstShutdownTimestamp = System.currentTimeMillis();
this.release();
} else if (this.getRefCount() > 0) {
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
this.refCount.set(-1000 - this.getRefCount());
this.release();
}
}
}
第一步:关闭MappedFile。初次调用时this.available为true,设置available为false,并设置初次关闭的时间戳(firstShutdownTimestamp)为当前时间戳。调用release()方法尝试释放资源,release只有在引用次数小于1的情况下才会释放资源。如果引用次数大于0,对比当前时间与firstShutdownTimestamp,如果已经超过了其最大拒绝存活期,则每执行一次引用操作,引用数减少1000,直到引用数小于0时通过执行realse()方法释放资源,如代码清单4-25所示。
public boolean isCleanupOver() {
return this.refCount.get() <= 0 && this.cleanupOver;
}
第二步:判断是否清理完成,判断标准是引用次数小于、等于0并且cleanupOver为true,cleanupOver为true的触发条件是release成功将MappedByteBuffer资源释放了,如代码清单4-26所示。稍后详细分析release()方法。
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();
第三步:关闭文件通道,删除物理文件。 在整个MappedFile销毁的过程中,首先需要释放资源,释放资源的前提条件是该MappedFile的引用小于、等于0。接下来重点看一下release()方法的实现原理,如代码清单4-27所示。
public void release() {
long value = this.refCount.decrementAndGet();
if (value > 0) {
return;
}
synchronized (this) {
this.cleanupOver = this.cleanup(value);
}
}
将引用次数减1,如果引用数小于、等于0,则执行cleanup()方法,下面重点分析cleanup()方法的实现,如代码清单4-28所示。
public boolean cleanup(final long currentRef) {
if (this.isAvailable()) {
return false;
}
if (this.isCleanupOver()) {
return true;
}
clean(this.mappedByteBuffer);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
TOTAL_MAPPED_FILES.decrementAndGet();
log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
return true;
}
如果available为true,表示MappedFile当前可用,无须清理,返回false,如果资源已经被清除,返回true。如果是堆外内存,调用堆外内存的cleanup()方法进行清除,维护MappedFile类变量TOTAL_MAPPED_VIRTUAL_MEMORY、TOTAL_MAPPED_FILES并返回true,表示cleanupOver为true。
TransientStorePool
TransientStorePool即短暂的存储池。RocketMQ单独创建了一个DirectByteBuffer内存缓存池,用来临时存储数据,数据先写入该内存映射中,然后由Commit线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。RokcetMQ引入该机制是为了提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘中。
TransientStorePool类图如图4-13所示。

下面介绍TransientStorePool的核心属性,如代码清单4-29所示。
1)int poolSize:avaliableBuffers个数,可在broker配置文件中通过transientStorePoolSize进行设置,默认为5。 2)int fileSize:每个ByteBuffer的大小,默认为mapedFileSizeCommitLog,表明TransientStorePool为CommitLog文件服务。 3)Deque availableBuffers:ByteBuffer容器,双端队列。
public void init() {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
availableBuffers.offer(byteBuffer);
}
}
创建数量为poolSize的堆外内存,利用com.sun.jna.Library类库锁定该批内存,避免被置换到交换区,以便提高存储性能。