文件刷盘机制

RocketMQ的存储与读写是基于JDK NIO的内存映射机制(MappedByteBuffer)的,消息存储时首先将消息追加到内存中,再根据配置的刷盘策略在不同时间刷盘。如果是同步刷盘,消息追加到内存后,将同步调用MappedByteBuffer的force()方法;如果是异步刷盘,在消息追加到内存后会立刻返回给消息发送端。RocketMQ使用一个单独的线程按照某一个设定的频率执行刷盘操作。通过在broker配置文件中配置flushDiskType来设定刷盘方式,可选值为ASYNC_FLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘),默认为异步刷盘。本节以CommitLog文件刷盘机制为例来剖析RocketMQ的刷盘机制,ConsumeQueue文件、Index文件刷盘的实现原理与CommitLog刷盘机制类似。RocketMQ处理刷盘的实现方法为Commitlog#handleDiskFlush(),刷盘流程作为消息发送、消息存储的子流程,我们先重点了解消息存储流程的相关知识。值得注意的是,Index文件的刷盘并不是采取定时刷盘机制,而是每更新一次Index文件就会将上一次的改动写入磁盘。

Broker同步刷盘

同步刷盘指的是在消息追加到内存映射文件的内存中后,立即将数据从内存写入磁盘文件,由CommitLog的handleDiskFlush方法实现,如代码清单4-74所示。

代码清单4-74 CommitLog#handleDiskFlush
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);

boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}

同步刷盘实现流程如下。

1)构建GroupCommitRequest同步任务并提交到GroupCommitRequest。 2)等待同步刷盘任务完成,如果超时则返回刷盘错误,刷盘成功后正常返回给调用方。GroupCommitRequest的类图如图4-21所示。

image 2025 01 18 14 13 04 896
Figure 1. 图4-21 GroupCommitRequest类图

下面介绍GroupCommitRequest的核心属性,如代码清单4-75所示。

1)long nextOffset:刷盘点偏移量。 2)CountDownLatch countDownLatch:倒记数锁存器。 3)flushOk:刷盘结果,初始为false。

代码清单4-75 GroupCommitRequest#waitForFlush
public boolean waitForFlush(long timeout) {
    try {
        this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
        return this.flushOK;
    } catch (InterruptedException e) {
        log.error("Interrupted", e);
        return false;
    }
}

消费发送线程将消息追加到内存映射文件后,将同步任务GroupCommitRequest提交到GroupCommitService线程,然后调用阻塞等待刷盘结果,超时时间默认为5s,如代码清单4-76所示。

代码清单4-76 GroupCommitRequest#wakeupCustomer
public void wakeupCustomer(final boolean flushOK) {
    this.flushOK = flushOK;
    this.countDownLatch.countDown();
}

GroupCommitService线程处理GroupCommitRequest对象后将调用wakeupCustomer方法将消费发送线程唤醒,并将刷盘请求告知GroupCommitRequest。同步刷盘线程实现GroupCommitService类图如图4-22所示。

image 2025 01 18 14 15 25 309
Figure 2. 图4-22 GroupCommitRequest类图

1)private volatile List requestsWrite:同步刷盘任务暂存容器。 2)private volatile List requestsRead:GroupCommitService线程每次处理的request容器,这是一个设计亮点,避免了任务提交与任务执行的锁冲突,如代码清单4-77所示。

代码清单4-77 GroupCommitService#putRequest
public synchronized void putRequest(final GroupCommitRequest request) {
    synchronized (this.requestsWrite) {
        this.requestsWrite.add(request);
    }
    if (hasNotified.compareAndSet(false, true)) {
        waitPoint.countDown();
    }
}

客户端提交同步刷盘任务到 GroupCommitService 线程,如果该线程处于等待状态则将其唤醒,如代码清单 4-78 所示。

代码清单4-78 GroupCommitService#swapRequests
private void swapRequests() {
    List<GroupCommitRequest> tmp = this.requestsWrite;
    this.requestsWrite = this.requestsRead;
    this.requestsRead = tmp;
}

为了避免同步刷盘消费任务与其他消息生产者提交任务产生锁竞争,GroupCommitService 提供读容器与写容器,这两个容器每执行完一次任务后交互,继续消费任务,如代码清单4-79所示。

代码清单4-79 GroupCommitService#run
public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            this.waitForRunning(10);
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception.", e);
        }
    }
}

GroupCommitService组提交线程,每处理一批刷盘请求后,如果后续有待刷盘的请求需要处理,组提交线程会马不停蹄地处理下一批;如果没有待处理的任务,则休息10ms,即每10ms空转一次,如代码清单4-80所示。

代码清单4-80 GroupCommitService#doCommit
for (GroupCommitRequest req : this.requestsRead) {
    boolean flushOK = false;
    for (int i = 0; i < 2 && !flushOK; i++) {
        flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
        if (!flushOK) {
            CommitLog.this.mappedFileQueue.flush(0);
        }
    }
    req.wakeupCustomer(flushOK);
}

long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}

1)执行刷盘操作,即调用MappedByteBuffer#force方法。遍历同步刷盘任务列表,根据加入顺序逐一执行刷盘逻辑。 2)调用mappedFileQueue#flush方法执行刷盘操作,最终会调用MappedByteBuffer#force()方法,其具体实现已在4.4节做了详细说明。如果已刷盘指针大于、等于提交的刷盘点,表示刷盘成功,每执行一次刷盘操作后,立即调用GroupCommitRequest#wakeupCustomer唤醒消息发送线程并通知刷盘结果。 3)处理完所有同步刷盘任务后,更新刷盘检测点StoreCheckpoint中的physicMsgTimestamp,但并没有执行检测点的刷盘操作,刷盘检测点的刷盘操作将在刷写消息队列文件时触发。

同步刷盘的简单描述是,消息生产者在消息服务端将消息内容追加到内存映射文件中(内存)后,需要同步将内存的内容立刻写入磁盘。通过调用内存映射文件(MappedByteBuffer的force方法)可将内存中的数据写入磁盘。

Broker异步刷盘

异步刷盘操作如代码清单4-81所示。

代码清单4-81 CommitLog#handleDiskFlush
// 异步刷盘
else {
    if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
        flushCommitLogService.wakeup();
    } else {
        commitLogService.wakeup();
    }
}

开启transientStorePoolEnable机制则启动异步刷盘方式,刷盘实现较同步刷盘有细微差别。如果transientStorePoolEnable为true,RocketMQ会单独申请一个与目标物理文件(CommitLog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到与物理文件的内存映射中,再经flush操作到磁盘。如果transientStorePoolEnable为false,消息将追加到与物理文件直接映射的内存中,然后写入磁盘。transientStorePoolEnable为true的刷盘流程如图4-23所示。

image 2025 01 18 14 24 01 582
Figure 3. 图4-23 刷盘流程

1)将消息直接追加到ByteBuffer(堆外内存DirectByteBuffer),wrotePosition随着消息的不断追加向后移动。 2)CommitRealTimeService线程默认每200ms将ByteBuffer新追加(wrotePosition减去commitedPosition)的数据提交到FileChannel中。 3)FileChannel在通道中追加提交的内容,其wrotePosition指针向前后移动,然后返回。 4)commit操作成功返回,将commitedPosition向前后移动本次提交的内容长度,此时wrotePosition指针依然可以向前推进。 5)FlushRealTimeService线程默认每500ms将FileChannel中新追加的内存(wrotePosition减去上一次写入位置flushedPositiont),通过调用FileChannel#force()方法将数据写入磁盘。

  1. CommitRealTimeService提交线程工作机制

    CommitRealTimeService提交线程如代码清单4-82所示。

    代码清单4-82 CommitLog$CommitRealTimeService#run
    int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
    int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
    int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

    第一步:如代码清单4-83所示,先解释3个配置参数的含义。

    1)commitIntervalCommitLog:CommitRealTimeService线程间隔时间,默认200ms。

    2)commitLogLeastPages:一次提交任务至少包含的页数,如果待提交数据不足,小于该参数配置的值,将忽略本次提交任务,默认4页。

    3)commitDataThoroughInterval:两次真实提交的最大间隔时间,默认200ms。

    代码清单4-83 CommitLog$CommitRealTimeService#run
    long begin = System.currentTimeMillis();
    if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
        this.lastCommitTimestamp = begin;
        commitDataLeastPages = 0;
    }

    第二步:如果距上次提交间隔超过commitDataThoroughInterval,则本次提交忽略commitLogLeastPages参数,也就是如果待提交数据小于指定页数,也执行提交操作,如代码清单4-84所示。

    代码清单4-84 CommitLog$CommitRealTimeService#run
    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
    long end = System.currentTimeMillis();
    
    if (!result) {
        this.lastCommitTimestamp = end;
        flushCommitLogService.wakeup();
    }
    
    this.waitForRunning(interval);

    第三步:执行提交操作,将待提交数据提交到物理文件的内存映射内存区,如果返回false,并不代表提交失败,而是表示有数据提交成功了,唤醒刷盘线程执行刷盘操作。该线程每完成一次提交动作,将等待200ms再继续执行下一次提交任务。

  2. FlushRealTimeService刷盘线程工作机制

FlushRealTimeService刷盘线程工作流程如代码清单4-85所示。

代码清单4-85 CommitLog$FlushRealTimeService#run
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

第一步:如代码清单4-86所示,先解释4个配置参数的含义。

  • flushCommitLogTimed:默认为false,表示使用await方法等待;如果为true,表示使用Thread.sleep方法等待。

  • flushIntervalCommitLog:FlushRealTimeService线程任务运行间隔时间。

  • flushPhysicQueueLeastPages:一次刷盘任务至少包含页数,如果待写入数据不足,小于该参数配置的值,将忽略本次刷盘任务,默认4页。

  • flushPhysicQueueThoroughInterval:两次真实刷盘任务的最大间隔时间,默认10s。

代码清单4-86 CommitLog$FlushRealTimeService#run
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
    this.lastFlushTimestamp = currentTimeMillis;
    flushPhysicQueueLeastPages = 0;
    printFlushProgress = (printTimes++ % 10) == 0;
}

第二步:如果距上次提交数据的间隔时间超过flushPhysicQueueThoroughInterval,则本次刷盘任务将忽略flushPhysicQueueLeastPages,也就是如果待写入数据小于指定页数,也执行刷盘操作,如代码清单4-87所示。

代码清单4-87 CommitLog$FlushRealTimeService#run
if (flushCommitLogTimed) {
    Thread.sleep(interval);
} else {
    this.waitForRunning(interval);
}

第三步:执行一次刷盘任务前先等待指定时间间隔,然后执行刷盘任务,如代码清单4-88所示。

代码清单4-88 CommitLog$FlushRealTimeService#run
long begin = System.currentTimeMillis();
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}

第四步:调用flush方法将内存中的数据写入磁盘,并且更新checkpoint文件的CommitLog文件更新时间戳,checkpoint文件的刷盘动作在刷盘ConsumeQueue线程中执行,其入口为DefaultMessageStore#FlushConsumeQueueService。ConsumeQueue、Index文件的刷盘实现原理与CommitLog文件的刷盘机制类似,本书不再单独分析。