过期文件删除机制
因为RocketMQ操作CommitLog、ConsumeQueue文件是基于内存映射机制并在启动的时候会加载commitlog、consumequeue目录下的所有文件,所以为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,这就需要引入一种机制来删除已过期的文件。RocketMQ顺序写CommitLog文件、ConsumeQueue文件,所有写操作全部落在最后一个CommitLog或ConsumeQueue文件上,之前的文件在下一个文件创建后将不会再被更新。RocketMQ清除过期文件的方法:如果非当前写文件在一定时间间隔内没有再次更新,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72h,通过在broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。接下来详细分析RocketMQ是如何设计与实现上述机制的,如代码清单4-89所示。
private void addScheduleTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
// 省略其他定时任务
}
RocketMQ每隔10s调度一次cleanFilesPeriodically,检测是否需要清除过期文件。执行频率可以通过cleanResourceInterval进行设置,默认10s,如代码清单4-90所示。
private void cleanFilesPeriodically() {
this.cleanCommitLogService.run();
this.cleanConsumeQueueService.run();
}
分别执行清除CommitLog文件与ConsumQueue文件。ConsumQueue文件与CommitLog文件共用一套过期文件删除机制,本节将重点讲解CommitLog过期文件删除。实现方法为DefaultMessageStore$CleanCommitLog Service#deleteExpiredFiles,如代码清单4-91所示。
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
第一步:解释一下这个3个配置属性的含义。
1)fileReservedTime:文件保留时间,如果超过了该时间,则认为是过期文件,可以被删除。 2)deletePhysicFilesInterval:删除物理文件的间隔时间,在一次清除过程中,可能需要被删除的文件不止一个,该值指定两次删除文件的间隔时间。 3)destroyMapedFileIntervalForcibly:在清除过期文件时,如果该文件被其他线程占用(引用次数大于0,比如读取消息),此时会阻止此次删除任务,同时在第一次试图删除该文件时记录当前时间戳,destroyMapedFileIntervalForcibly表示第一次拒绝删除之后能保留文件的最大时间,在此时间内,同样可以被拒绝删除,超过该时间后,会将引用次数设置为负数,文件将被强制删除,如代码清单4-92所示。
boolean timeup = this.isTimeToDelete();
boolean spacefull = this.isSpaceToDelete();
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
if (timeup || spacefull || manualDelete) {
// 继续执行删除逻辑
return;
} else {
// 本次删除任务无作为
}
第二步:RocketMQ满足如下任意一种情况将继续执行删除文件的操作。
1)指定删除文件的时间点,RocketMQ通过deleteWhen设置每天在固定时间执行一次删除过期文件操作,默认凌晨4点。 2)检查磁盘空间是否充足,如果磁盘空间不充足,则返回true,表示应该触发过期文件删除操作。 3)预留手工触发机制,可以通过调用excuteDeleteFilesManualy方法手工触发删除过期文件的操作,目前RocketMQ暂未封装手工触发文件删除的命令。
下面重点分析一下磁盘空间是否充足的实现逻辑,如代码清单4-93所示。
private boolean isSpaceToDelete() {
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
cleanImmediately = false;
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
if (physicRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
// 省略日志输出语句
cleanImmediately = true;
} else if (physicRatio > diskSpaceCleanForciblyRatio) {
cleanImmediately = true;
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
// 省略日志输出语句
}
if (physicRatio < 0 || physicRatio > ratio) {
return true;
}
// 后面省略对ConsumeQueue文件做同样的判断
return false;
}
首先解释一下几个参数的含义。
1)diskMaxUsedSpaceRatio:表示CommitLog文件、ConsumeQueue文件所在磁盘分区的最大使用量,如果超过该值,则需要立即清除过期文件。 2)cleanImmediately:表示是否需要立即执行清除过期文件的操作。 3)physicRatio:当前CommitLog目录所在的磁盘分区的磁盘使用率,通过File#getTotalSpace方法获取文件所在磁盘分区的总容量,通过File#getFreeSpace方法获取文件所在磁盘分区的剩余容量。 4)diskSpaceWarningLevelRatio:通过系统参数Drocketmq.broker.diskSpaceWarningLevelRatio进行设置,默认0.90。如果磁盘分区使用率超过该阈值,将设置磁盘为不可写,此时会拒绝写入新消息。 5)diskSpaceCleanForciblyRatio:通过系统参数Drocketmq.broker.diskSpaceCleanForcibly-Ratio进行设置,默认0.85。如果磁盘分区使用超过该阈值,建议立即执行过期文件删除,但不会拒绝写入新消息。
如果当前磁盘分区使用率大于diskSpaceWarningLevelRatio,应该立即启动过期文件删除操作。如果当前磁盘分区使用率大于diskSpaceCleanForciblyRatio,建议立即执行过期文件清除,如果磁盘使用率低于diskSpaceCleanForciblyRatio将恢复磁盘可写。如果当前磁盘使用率小于diskMaxUsedSpaceRatio,则返回false,表示磁盘使用率正常,否则返回true,需要执行删除过期文件,如代码清单4-94所示。
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
if (mappedFile.destroy(intervalForcibly)) {
files.add(mappedFile);
deleteCount++;
if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
// 处理 InterruptedException
}
}
} else {
break;
}
}
}
下面执行文件销毁与删除。从倒数第二个文件开始遍历,计算文件的最大存活时间,即文件的最后一次更新时间+文件存活时间(默认72小时),如果当前时间大于文件的最大存活时间或需要强制删除文件(当磁盘使用超过设定的阈值)时,执行MappedFile#destory方法,清除MappedFile占有的相关资源,如果执行成功,将该文件加入待删除文件列表中,最后统一执行File#delete方法将文件从物理磁盘中删除。