过期文件删除机制

因为RocketMQ操作CommitLog、ConsumeQueue文件是基于内存映射机制并在启动的时候会加载commitlog、consumequeue目录下的所有文件,所以为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,这就需要引入一种机制来删除已过期的文件。RocketMQ顺序写CommitLog文件、ConsumeQueue文件,所有写操作全部落在最后一个CommitLog或ConsumeQueue文件上,之前的文件在下一个文件创建后将不会再被更新。RocketMQ清除过期文件的方法:如果非当前写文件在一定时间间隔内没有再次更新,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72h,通过在broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。接下来详细分析RocketMQ是如何设计与实现上述机制的,如代码清单4-89所示。

代码清单4-89 DefaultMessageStore#addScheduleTask
private void addScheduleTask() {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        public void run() {
            DefaultMessageStore.this.cleanFilesPeriodically();
        }
    }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
    // 省略其他定时任务
}

RocketMQ每隔10s调度一次cleanFilesPeriodically,检测是否需要清除过期文件。执行频率可以通过cleanResourceInterval进行设置,默认10s,如代码清单4-90所示。

代码清单4-90 DefaultMessageStore#cleanFilesPeriodically
private void cleanFilesPeriodically() {
    this.cleanCommitLogService.run();
    this.cleanConsumeQueueService.run();
}

分别执行清除CommitLog文件与ConsumQueue文件。ConsumQueue文件与CommitLog文件共用一套过期文件删除机制,本节将重点讲解CommitLog过期文件删除。实现方法为DefaultMessageStore$CleanCommitLog Service#deleteExpiredFiles,如代码清单4-91所示。

代码清单4-91 DefaultMessageStore$CleanCommitLogService#deleteExpiredFiles
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

第一步:解释一下这个3个配置属性的含义。

1)fileReservedTime:文件保留时间,如果超过了该时间,则认为是过期文件,可以被删除。 2)deletePhysicFilesInterval:删除物理文件的间隔时间,在一次清除过程中,可能需要被删除的文件不止一个,该值指定两次删除文件的间隔时间。 3)destroyMapedFileIntervalForcibly:在清除过期文件时,如果该文件被其他线程占用(引用次数大于0,比如读取消息),此时会阻止此次删除任务,同时在第一次试图删除该文件时记录当前时间戳,destroyMapedFileIntervalForcibly表示第一次拒绝删除之后能保留文件的最大时间,在此时间内,同样可以被拒绝删除,超过该时间后,会将引用次数设置为负数,文件将被强制删除,如代码清单4-92所示。

代码清单4-92 DefaultMessageStore$CleanCommitLogService#deleteExpiredFiles
boolean timeup = this.isTimeToDelete();
boolean spacefull = this.isSpaceToDelete();
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;

if (timeup || spacefull || manualDelete) {
    // 继续执行删除逻辑
    return;
} else {
    // 本次删除任务无作为
}

第二步:RocketMQ满足如下任意一种情况将继续执行删除文件的操作。

1)指定删除文件的时间点,RocketMQ通过deleteWhen设置每天在固定时间执行一次删除过期文件操作,默认凌晨4点。 2)检查磁盘空间是否充足,如果磁盘空间不充足,则返回true,表示应该触发过期文件删除操作。 3)预留手工触发机制,可以通过调用excuteDeleteFilesManualy方法手工触发删除过期文件的操作,目前RocketMQ暂未封装手工触发文件删除的命令。

下面重点分析一下磁盘空间是否充足的实现逻辑,如代码清单4-93所示。

代码清单4-93 DefaultMessageStore$CleanCommitLogService#isSpaceToDelete
private boolean isSpaceToDelete() {
    double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
    cleanImmediately = false;
    String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
    double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);

    if (physicRatio > diskSpaceWarningLevelRatio) {
        boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
        // 省略日志输出语句
        cleanImmediately = true;
    } else if (physicRatio > diskSpaceCleanForciblyRatio) {
        cleanImmediately = true;
    } else {
        boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
        // 省略日志输出语句
    }

    if (physicRatio < 0 || physicRatio > ratio) {
        return true;
    }

    // 后面省略对ConsumeQueue文件做同样的判断
    return false;
}

首先解释一下几个参数的含义。

1)diskMaxUsedSpaceRatio:表示CommitLog文件、ConsumeQueue文件所在磁盘分区的最大使用量,如果超过该值,则需要立即清除过期文件。 2)cleanImmediately:表示是否需要立即执行清除过期文件的操作。 3)physicRatio:当前CommitLog目录所在的磁盘分区的磁盘使用率,通过File#getTotalSpace方法获取文件所在磁盘分区的总容量,通过File#getFreeSpace方法获取文件所在磁盘分区的剩余容量。 4)diskSpaceWarningLevelRatio:通过系统参数Drocketmq.broker.diskSpaceWarningLevelRatio进行设置,默认0.90。如果磁盘分区使用率超过该阈值,将设置磁盘为不可写,此时会拒绝写入新消息。 5)diskSpaceCleanForciblyRatio:通过系统参数Drocketmq.broker.diskSpaceCleanForcibly-Ratio进行设置,默认0.85。如果磁盘分区使用超过该阈值,建议立即执行过期文件删除,但不会拒绝写入新消息。

如果当前磁盘分区使用率大于diskSpaceWarningLevelRatio,应该立即启动过期文件删除操作。如果当前磁盘分区使用率大于diskSpaceCleanForciblyRatio,建议立即执行过期文件清除,如果磁盘使用率低于diskSpaceCleanForciblyRatio将恢复磁盘可写。如果当前磁盘使用率小于diskMaxUsedSpaceRatio,则返回false,表示磁盘使用率正常,否则返回true,需要执行删除过期文件,如代码清单4-94所示。

代码清单4-94 MappedFileQueue#deleteExpiredFileByTime
for (int i = 0; i < mfsLength; i++) {
    MappedFile mappedFile = (MappedFile) mfs[i];
    long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;

    if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
        if (mappedFile.destroy(intervalForcibly)) {
            files.add(mappedFile);
            deleteCount++;

            if (files.size() >= DELETE_FILES_BATCH_MAX) {
                break;
            }

            if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                try {
                    Thread.sleep(deleteFilesInterval);
                } catch (InterruptedException e) {
                    // 处理 InterruptedException
                }
            }
        } else {
            break;
        }
    }
}

下面执行文件销毁与删除。从倒数第二个文件开始遍历,计算文件的最大存活时间,即文件的最后一次更新时间+文件存活时间(默认72小时),如果当前时间大于文件的最大存活时间或需要强制删除文件(当磁盘使用超过设定的阈值)时,执行MappedFile#destory方法,清除MappedFile占有的相关资源,如果执行成功,将该文件加入待删除文件列表中,最后统一执行File#delete方法将文件从物理磁盘中删除。