同步双写
RocketMQ 为了优化同步复制的性能,在RocketMQ 4.7.0中正式对原先的同步复制做了重大改造,大大提高了同步复制的性能。我们不妨先来简单回顾一下之前关于同步复制的基本流程,如图4-24所示。

消息发送线程SendMessageThread在收到客户端请求时会调用SendMessageProcessor中的方法,将消息写入Broker。如果消息复制模式为同步复制,则需要将消息同步复制到从节点,本次消息发送才会返回,即SendMessageThread线程需要在收到从节点的同步结果后才能继续处理下一条消息。
那么RocketMQ 4.7.0中又是如何进行优化的呢?因为同步复制的语义就是将消息同步到从节点,所以这个复制过程没有什么可优化的,那么,是不是可以减少SendMessageThread线程的等待时间,即在同步复制的过程中,SendMessageThread线程可以继续处理其他消息,只是收到从节点的同步结果后再向客户端返回结果。提高Broker的消息处理能力,重复利用Broker的资源,就是将上述putMessage同步方式修改为异步方式。
接下来我们对源码进行解读,学习RocketMQ是如何将putMessage这个过程异步化处理的。RocketMQ4.7.0的同步双写流程如图4-25所示。

通过对比发现,CommitLog向HaService提交数据同步请求后并没有被阻塞,而是返回了一个CompletableFuture对象,SendMessageProcessor在收到返回结果后,将继续处理新的消息,等到消息被成功同步到从节点后,会调用CompletableFuture的complete方法,触发网络通信,将结果返回到客户端。对消息发送客户端而言,消息被复制到从节点后才会被返回成功,符合同步复制的语义,但在Broker端,处理消息发送的线程却是异步执行的,在消息复制的过程中发送线程并不会阻塞,其响应时间、Broker CPU得到充分利用。
消息发送的流程已经在第3章详细介绍过,接下来我们将重点突出CompletableFuture的使用,一起学习如何使用CompletableFuture实现真正的异步编程,如代码清单4-95所示。
private CompletableFuture<RemotingCommand> asyncSendMessage(
ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext c, SendMessageRequestHeader requestHeader) {
// 省略相关代码
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
// 事务
} else {
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(
putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
第一步:调用MessageStore的asyncPutMessage方法,并通过handlePutMessageResultFuture对返回结果进行处理。在详细介绍asyncPutMessage之前,我们来看一下对结果的处理,这是CompletableFuture异步编程的关键点,如代码清单4-96所示。
private CompletableFuture<RemotingCommand> handlePutMessageResultFuture(
CompletableFuture<PutMessageResult> putMessageResult, /* 省略该方法参数 */) {
return putMessageResult.thenApply((r) ->
handlePutMessageResult(r, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt)
);
}
这里的关键是先定义CompletableFuture的thenApply方法,该方法并不会立即执行,而是在CompletableFuture的complete方法被调用时才会执行,这就是异步实现的妙处,类似事件的通知。complete方法会在什么时候会被调用呢?带着这个问题我们继续上面流程的跟踪,如代码清单4-97所示。
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// 省略消息发送相关的逻辑
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
}
第二步:消息首先进入pagecache,然后执行刷盘操作,接着调用submitReplicaRequest方法将消息提交到HaService,进行数据复制,这里使用了ComplateFuture的thenCombine方法,将刷盘、复制当成一个联合任务执行,这里设置消息追加的最终状态,如代码清单4-98所示。
public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
GroupCommitRequest request = new GroupCommitRequest(
result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()
);
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
return request.future();
} else {
return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
} else {
return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
第三步:向HaService提交GroupCommitRequest对象后,返回的并不是同步结果,而是一个CompletableFuture<PutMessageStatus>对象,该对象的thenApply方法是在上文提到的handlePutMessageResultFuture方法中定义的,而CompletableFuture的complete方法会在消息被复制到从节点后被调用,其核心代码在GroupCommitRequest中,如代码清单4-99所示。
public void wakeupCustomer(final boolean flushOK) {
long endTimestamp = System.currentTimeMillis();
PutMessageStatus result = (flushOK && ((endTimestamp - this.startTimestamp) <= this.timeoutMillis))
? PutMessageStatus.PUT_OK
: PutMessageStatus.FLUSH_SLAVE_TIMEOUT;
this.flushOKFuture.complete(result);
}
第四步:在消息成功复制和复制失败后,CompletableFuture的complete方法将被调用,从而CompletableFuture的thenApply方法被触发调用,通过该方法向客户端返回消息发送的最终结果,实现在Broker端的异步编程,使之同步复制的性能接近异步复制,大大提高消息的复制性能。