存储文件组织与内存映射

RocketMQ通过使用内存映射文件来提高I/O访问性能,无论是CommitLog、Consume-Queue还是Index,单个文件都被设计为固定长度,一个文件写满以后再创建新文件,文件名就为该文件第一条消息对应的全局物理偏移量。

RocketMQ使用MappedFile、MappedFileQueue来封装存储文件。

MappedFileQueue映射文件队列

MappedFileQueue是MappedFile的管理容器,MappedFileQueue对存储目录进行封装,例如CommitLog文件的存储路径为${ROCKET_HOME}/store/commitlog/,该目录下会存在多个内存映射文件MappedFile。MappedFileQueue类图如图4-11所示。

image 2025 01 18 12 35 35 737
Figure 1. 图4-11 MappedFileQueue类图

下面介绍MappedFileQueue的核心属性。

1)String storePath:存储目录。 2)int mappedFileSize:单个文件的存储大小。 3)CopyOnWriteArrayList mappedFiles:MappedFile集合。 4)AllocateMappedFileService allocateMappedFileService:创建MappedFile服务类。 5)long flushedWhere = 0:当前刷盘指针,表示该指针之前的所有数据全部持久化到磁盘。 6)long committedWhere = 0:当前数据提交指针,内存中ByteBuffer当前的写指针,该值大于、等于flushedWhere。

接下来重点分析根据不同维度查找MappedFile的方法,如代码清单4-11所示。

代码清单4-11 MappedFileQueue#getMappedFileByTime
public MappedFile getMappedFileByTime(final long timestamp) {
    Object[] mfs = this.copyMappedFiles(0);

    if (null == mfs) {
        return null;
    }

    for (int i = 0; i < mfs.length; i++) {
        MappedFile mappedFile = (MappedFile) mfs[i];

        if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
            return mappedFile;
        }
    }

    return (MappedFile) mfs[mfs.length - 1];
}

根据消息存储时间戳查找MappdFile。从MappedFile列表中第一个文件开始查找,找到第一个最后一次更新时间大于待查找时间戳的文件,如果不存在,则返回最后一个MappedFile,如代码清单4-12所示。

代码清单4-12 MappedFileQueue#findMappedFileByOffset
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
    // 省略外层 try ... catch
    MappedFile mappedFile = this.getFirstMappedFile();

    if (mappedFile != null) {
        int index = (int) ((offset / this.mappedFileSize) -
                          (mappedFile.getFileFromOffset() / this.mappedFileSize));

        if (index < 0 || index >= this.mappedFiles.size()) {
            // 省略警告日志
        }

        try {
            return this.mappedFiles.get(index);
        } catch (Exception e) {
            if (returnFirstOnNotFound) {
                return mappedFile;
            }
            LOG_ERROR.warn("findMappedFileByOffset failure. ", e);
        }
    }

    return null;  // 若未找到文件时,返回 null 或根据需要调整
}

根据消息偏移量offset查找MappedFile,但是不能直接使用offset%mappedFileSize。这是因为使用了内存映射,只要是存在于存储目录下的文件,都需要对应创建内存映射文件,如果不定时将已消费的消息从存储文件中删除,会造成极大的内存压力与资源浪费,所以RocketMQ采取定时删除存储文件的策略。也就是说,在存储文件中,第一个文件不一定是00000000000000000000,因为该文件在某一时刻会被删除,所以根据offset定位MappedFile的算法为(int)offset/this.mappedFileSize)(mappedFile.getFileFromOffset()/this.MappedFileSize,如代码清单4-13所示。

代码清单4-13 MappedFileQueue#getMinOffset
public long getMinOffset() {
    if (!this.mappedFiles.isEmpty()) {
        try {
            return this.mappedFiles.get(0).getFileFromOffset();
        } catch (IndexOutOfBoundsException e) {
            // 处理 IndexOutOfBoundsException 异常
        } catch (Exception e) {
            log.error("getMinOffset has exception.", e);
        }
    }
    return -1;
}

获取存储文件最小偏移量。从这里也可以看出,并不是直接返回0,而是返回MappedFile的getFileFormOffset()方法,如代码清单4-14所示。

代码清单4-14 MappedFileQueue#getMaxOffset
public long getMaxOffset() {
    MappedFile mappedFile = getLastMappedFile();

    if (mappedFile != null) {
        return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
    }

    return 0;
}

获取存储文件的最大偏移量。返回最后一个MappedFile的fileFromOffset,加上MappedFile当前的写指针,如代码清单4-15所示。

代码清单4-15 MappedFileQueue#getMaxWrotePosition
public long getMaxWrotePosition() {
    MappedFile mappedFile = getLastMappedFile();

    if (mappedFile != null) {
        return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
    }

    return 0;
}

返回存储文件当前的写指针。返回最后一个文件的fileFromOffset,加上当前写指针位置。

关于MappedFileQueue的相关业务方法,我们在具体使用到时再去剖析。

MappedFile内存映射文件

MappedFile是RocketMQ内存映射文件的具体实现,如图4-12所示。

image 2025 01 18 12 44 57 993
Figure 2. 图4-12 MappedFile类图

下面介绍MappedFile的核心属性。 1)int OS_PAGE_SIZE:操作系统每页大小,默认4KB。 2)AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY:当前JVM实例中MappedFile的虚拟内存。 3)AtomicInteger TOTAL_MAPPED_FILES:当前JVM实例中MappedFile对象个数。 4)AtomicInteger wrotePosition:当前文件的写指针,从0开始(内存映射文件中的写指针)。 5)AtomicInteger committedPosition:当前文件的提交指针,如果开启transientStore-PoolEnable,则数据会存储在TransientStorePool中,然后提交到内存映射ByteBuffer中,再写入磁盘。 6)AtomicInteger flushedPosition:将该指针之前的数据持久化存储到磁盘中。 7)int fileSize:文件大小。 8)FileChannel fileChannel:文件通道。 9)ByteBuffer writeBuffer:堆外内存ByteBuffer,如果不为空,数据首先将存储在该Buffer中,然后提交到MappedFile创建的FileChannel中。transientStorePoolEnable为true时不为空。 10)TransientStorePool transientStorePool:堆外内存池,该内存池中的内存会提供内存锁机制。transientStorePoolEnable为true时启用。 11)String fileName:文件名称。 12)long fileFromOffset:该文件的初始偏移量。 13)File file:物理文件。 14)MappedByteBuffer mappedByteBuffer:物理文件对应的内存映射Buffer。 15)volatile long storeTimestamp = 0:文件最后一次写入内容的时间。 16)boolean firstCreateInQueue:是否是MappedFileQueue队列中第一个文件。

  1. MappedFile初始化

    第一步:根据是否开启transientStorePoolEnable存在两种初始化情况。transientStorePool-Enable为true表示内容先存储在堆外内存,然后通过Commit线程将数据提交到FileChannel中,再通过Flush线程将数据持久化到磁盘中,如代码清单4-16所示。

    代码清单4-16 MappedFile#init(final String fileName, final int fileSize)
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    this.fileFromOffset = Long.parseLong(this.file.getName());
    
    ensureDirOK(this.file.getParent());
    
    this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
    this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
    
    TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
    TOTAL_MAPPED_FILES.incrementAndGet();

    第二步:初始化fileFromOffset为文件名,也就是文件名代表该文件的起始偏移量,通过RandomAccessFile创建读写文件通道,并将文件内容使用NIO的内存映射Buffer将文件映射到内存中,如代码清单4-17所示。

    代码清单4-16 MappedFile#init(final String fileName, final int fileSize)
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    this.fileFromOffset = Long.parseLong(this.file.getName());
    
    ensureDirOK(this.file.getParent());
    
    this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
    this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
    
    TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
    TOTAL_MAPPED_FILES.incrementAndGet();

    第二步:初始化fileFromOffset为文件名,也就是文件名代表该文件的起始偏移量,通过RandomAccessFile创建读写文件通道,并将文件内容使用NIO的内存映射Buffer将文件映射到内存中,如代码清单4-17所示。

    代码清单4-17 MappedFile#init
    public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException {
        init(fileName, fileSize);
        this.writeBuffer = transientStorePool.borrowBuffer();
        this.transientStorePool = transientStorePool;
    }

    如果transientStorePoolEnable为true,则初始化MappedFile的writeBuffer,该buffer从transientStorePool中获取。

  2. MappedFile提交

    内存映射文件的提交动作由MappedFile的commit()方法实现,如代码清单4-18所示。

    代码清单4-18 MappedFile#commit
    public int commit(final int commitLeastPages) {
        if (writeBuffer == null) {
            return this.wrotePosition.get();
        }
    
        if (this.isAbleToCommit(commitLeastPages)) {
            if (this.hold()) {
                commit0(commitLeastPages);
                this.release();
            } else {
                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
            }
        }
    
        if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
            this.transientStorePool.returnBuffer(writeBuffer);
            this.writeBuffer = null;
        }
    
        return this.committedPosition.get();
    }

    执行提交操作,commitLeastPages为本次提交的最小页数,如果待提交数据不满足commitLeastPages,则不执行本次提交操作,等待下次提交。writeBuffer如果为空,直接返回wrotePosition指针,无须执行commit操作,这表明commit操作的主体是writeBuffer,如代码清单4-19所示。

    代码清单4-19 MappedFile#isAbleToCommit
    protected boolean isAbleToCommit(final int commitLeastPages) {
        int flush = this.committedPosition.get();
        int write = this.wrotePosition.get();
    
        if (this.isFull()) {
            return true;
        }
    
        if (commitLeastPages > 0) {
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
        }
    
        return write > flush;
    }

    判断是否执行commit操作。如果文件已满,返回true。如果commitLeastPages大于0,则计算wrotePosition(当前writeBuffe的写指针)与上一次提交的指针(committedPosition)的差值,将其除以OS_PAGE_SIZE得到当前脏页的数量,如果大于commitLeastPages,则返回true。如果commitLeastPages小于0,表示只要存在脏页就提交,如代码清单4-20所示。

    代码清单4-20 MappedFile#commit0
    protected void commit0(final int commitLeastPages) {
        int writePos = this.wrotePosition.get();
        int lastCommittedPosition = this.committedPosition.get();
    
        if (writePos - this.committedPosition.get() > 0) {
            try {
                ByteBuffer byteBuffer = writeBuffer.slice();
                byteBuffer.position(lastCommittedPosition);
                byteBuffer.limit(writePos);
    
                this.fileChannel.position(lastCommittedPosition);
                this.fileChannel.write(byteBuffer);
    
                this.committedPosition.set(writePos);
            } catch (Throwable e) {
                log.error("Error occurred when commit data to FileChannel.", e);
            }
        }
    }

    下面介绍具体的MappedFile提交实现过程。首先创建writeBuffer的共享缓存区,然后将新创建的position回退到上一次提交的位置(committedPosition),设置limit为wrotePosition(当前最大有效数据指针),接着把committedPosition到wrotePosition的数据复制(写入)到FileChannel中,最后更新committedPosition指针为wrotePosition。commit的作用是将MappedFile# writeBuffer中的数据提交到文件通道FileChannel中。 ByteBuffer使用技巧:调用slice()方法创建一个共享缓存区,与原先的ByteBuffer共享内存并维护一套独立的指针(position、mark、limit)。

  3. MappedFile刷盘

    刷盘指的是将内存中的数据写入磁盘,永久存储在磁盘中,由MappedFile的flush()方法实现,如代码清单4-21所示。

    代码清单4-21 MappedFile#flush
    public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();
                try {
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        this.fileChannel.force(false);
                    } else {
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }
    
                this.flushedPosition.set(value);
                this.release();
            } else {
                this.flushedPosition.set(getReadPosition());
            }
        }
    
        return this.getFlushedPosition();
    }

    直接调用mappedByteBuffer或fileChannel的force()方法将数据写入磁盘,将内存中的数据持久化到磁盘中,那么flushedPosition应该等于MappedByteBuffer中的写指针。如果writeBuffer不为空,则flushedPosition应等于上一次commit指针。因为上一次提交的数据就是进入MappedByteBuffer中的数据。如果writeBuffer为空,表示数据是直接进入MappedByteBuffer的,wrotePosition代表的是MappedByteBuffer中的指针,故设置flushedPosition为wrotePosition。

  4. 获取MappedFile最大读指针

    RocketMQ文件的一个组织方式是内存映射,预先申请一块连续且固定大小的内存,需要一套指针标识当前最大有效数据的位置,获取最大有效数据偏移量的方法由MappedFile的getReadPosition()方法实现,如代码清单4-22所示。

    代码清单4-22 MappedFile#getReadPosition
    public int getReadPosition() {
        return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
    }

    获取当前文件最大的可读指针,如代码清单4-23所示。如果writeBuffer为空,则直接返回当前的写指针。如果writeBuffer不为空,则返回上一次提交的指针。在MappedFile设计中,只有提交了的数据(写入MappedByteBuffer或FileChannel中的数据)才是安全的数据。

    代码清单4-23 MappedFile#selectMappedBuffer
    public SelectMappedBufferResult selectMappedBuffer(int pos) {
        int readPosition = getReadPosition();
    
        if (pos < readPosition && pos >= 0) {
            if (this.hold()) {
                ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
                byteBuffer.position(pos);
                int size = readPosition - pos;
    
                ByteBuffer byteBufferNew = byteBuffer.slice();
                byteBufferNew.limit(size);
    
                return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
            }
        }
    
        return null;
    }

    首先查找pos到当前最大可读指针之间的数据,因为在整个写入期间都未曾改变MappedByteBuffer的指针,所以mappedByteBuffer.slice()方法返回的共享缓存区空间为整个MappedFile。然后通过设置byteBuffer的position为待查找的值,读取字节为当前可读字节长度,最终返回的ByteBuffer的limit(可读最大长度)为size。整个共享缓存区的容量为MappedFile#fileSizepos,故在操作SelectMappedBufferResult时不能对包含在里面的ByteBuffer调用flip()方法。

    操作ByteBuffer时如果使用了slice()方法,对其ByteBuffer进行读取时一般手动指定position和limit指针,而不是调用flip()方法切换读写状态。

  5. MappedFile销毁

MappedFile文件销毁的实现方法为public boolean destroy(final long intervalForcibly),intervalForcibly表示拒绝被销毁的最大存活时间,如代码清单4-24所示。

代码清单4-24 MappedFile#shutdown
public void shutdown(final long intervalForcibly) {
    if (this.available) {
        this.available = false;
        this.firstShutdownTimestamp = System.currentTimeMillis();
        this.release();
    } else if (this.getRefCount() > 0) {
        if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
            this.refCount.set(-1000 - this.getRefCount());
            this.release();
        }
    }
}

第一步:关闭MappedFile。初次调用时this.available为true,设置available为false,并设置初次关闭的时间戳(firstShutdownTimestamp)为当前时间戳。调用release()方法尝试释放资源,release只有在引用次数小于1的情况下才会释放资源。如果引用次数大于0,对比当前时间与firstShutdownTimestamp,如果已经超过了其最大拒绝存活期,则每执行一次引用操作,引用数减少1000,直到引用数小于0时通过执行realse()方法释放资源,如代码清单4-25所示。

代码清单4-25 MappedFile#isCleanupOver
public boolean isCleanupOver() {
    return this.refCount.get() <= 0 && this.cleanupOver;
}

第二步:判断是否清理完成,判断标准是引用次数小于、等于0并且cleanupOver为true,cleanupOver为true的触发条件是release成功将MappedByteBuffer资源释放了,如代码清单4-26所示。稍后详细分析release()方法。

代码清单4-26 MappedFile#destroy
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");

long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();

第三步:关闭文件通道,删除物理文件。 在整个MappedFile销毁的过程中,首先需要释放资源,释放资源的前提条件是该MappedFile的引用小于、等于0。接下来重点看一下release()方法的实现原理,如代码清单4-27所示。

代码清单4-27 ReferenceResource#release
public void release() {
    long value = this.refCount.decrementAndGet();

    if (value > 0) {
        return;
    }

    synchronized (this) {
        this.cleanupOver = this.cleanup(value);
    }
}

将引用次数减1,如果引用数小于、等于0,则执行cleanup()方法,下面重点分析cleanup()方法的实现,如代码清单4-28所示。

代码清单4-28 MappedFile#cleanup
public boolean cleanup(final long currentRef) {
    if (this.isAvailable()) {
        return false;
    }

    if (this.isCleanupOver()) {
        return true;
    }

    clean(this.mappedByteBuffer);
    TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
    TOTAL_MAPPED_FILES.decrementAndGet();

    log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
    return true;
}

如果available为true,表示MappedFile当前可用,无须清理,返回false,如果资源已经被清除,返回true。如果是堆外内存,调用堆外内存的cleanup()方法进行清除,维护MappedFile类变量TOTAL_MAPPED_VIRTUAL_MEMORY、TOTAL_MAPPED_FILES并返回true,表示cleanupOver为true。

TransientStorePool

TransientStorePool即短暂的存储池。RocketMQ单独创建了一个DirectByteBuffer内存缓存池,用来临时存储数据,数据先写入该内存映射中,然后由Commit线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。RokcetMQ引入该机制是为了提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘中。

TransientStorePool类图如图4-13所示。

image 2025 01 18 13 04 53 600
Figure 3. 图4-13 TransientStorePool类图

下面介绍TransientStorePool的核心属性,如代码清单4-29所示。

1)int poolSize:avaliableBuffers个数,可在broker配置文件中通过transientStorePoolSize进行设置,默认为5。 2)int fileSize:每个ByteBuffer的大小,默认为mapedFileSizeCommitLog,表明TransientStorePool为CommitLog文件服务。 3)Deque availableBuffers:ByteBuffer容器,双端队列。

代码清单4-29 TransientStorePool#init
public void init() {
    for (int i = 0; i < poolSize; i++) {
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
        final long address = ((DirectBuffer) byteBuffer).address();
        Pointer pointer = new Pointer(address);
        LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
        availableBuffers.offer(byteBuffer);
    }
}

创建数量为poolSize的堆外内存,利用com.sun.jna.Library类库锁定该批内存,避免被置换到交换区,以便提高存储性能。