RocketMQ 主从同步原理

为了提高消息消费的高可用性,避免 Broker 发生单点故障,使得存储在 Broker 上的消息无法及时消费,RocketMQ 引入了 Broker 主从同步机制,即消息消费到达主服务器后,需要将消息同步到消息从服务器,如果主服务器 Broker 宕机,消息消费者可以从从服务器拉取消息。

接下来详细探讨 RocketMQ 主从同步的实现原理,RocketMQ 高可用核心类图如图 7-1 所示。

image 2025 02 06 10 46 06 597
Figure 1. 图7-1 RocketMQ HA核心类图

从图7-1中我们知道 RocketMQ HA 由如下 7 个核心类实现。

1)HAService:RocketMQ主从同步核心实现类。 2)HAService$AcceptSocketService:高可用主服务器监听客户端连接实现类。 3)HAService$GroupTransferService:主从同步通知实现类。 4)HAService$HAClient:HA客户端实现类。 5)HAConnection:HA主服务器高可用连接对象的封装,也是Broker从服务器的网络读写实现类。 6)HAConnection$WriteSocketServicce:高可用主节点网络写实现类。 7)HAConnection$ReadSocketService:高可用主节点网络读实现类。

HAService 整体工作机制

我们从HAService开始,了解RocketMQ高可用的工作机制,首先看一下代码清单7-1。

代码清单7-1 HAService#start
public void start () throws Exception {
    this.acceptSocketService.beginAccept();
    this.acceptSocketService.start();
    this.groupTransferService.start();
    this.haClient.start();
}

RocketMQ 高可用的实现原理如下。

  • 主服务器启动,并在特定端口上监听从服务器的连接。

  • 从服务器主动连接主服务器,主服务器接收客户端的连接,并建立相关 TCP 连接。

  • 从服务器主动向主服务器发送待拉取消息的偏移量,主服务器解析请求并返回消息给从服务器。

  • 从服务器保存消息并继续发送新的消息同步请求。

AcceptSocketService 实现原理

AcceptSocketService 作为 HAService 的内部类,实现主服务器监听从服务器的连接,类图如图 7-2 所示。

image 2025 02 06 11 04 15 835
Figure 2. 图7-2 AcceptSocketService类图

如代码清单7-2所示,AcceptSocketService的属性如下。

  • SocketAddress socketAddressListen:Broker服务监听套接字(本地IP+端口号)。

  • ServerSocketChannel serverSocketChannel:服务端Socket通道,基于NIO。

  • Selector selector:事件选择器,基于NIO。

代码清单7-2 HAService$AcceptSocketService#beginAccept
public void beginAccept () throws Exception {
    this.serverSocketChannel = ServerSocketChannel.open();
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true);
    this.serverSocketChannel.socket().bind(this.socketAddressListen); this.serverSocketChannel.configureBlocking(false);
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

创建 ServerSocketChannel 和 Selector、设置TCP reuseAddress、绑定监听端口、设置为非阻塞模式,并注册 OP_ACCEPT(连接事件),如代码清单7-3所示。

代码清单7-3 HAService$AcceptSocketService#run
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
    for (SelectionKey k : selected) {
        if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
            SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
            if (sc != null) {
                HAService.log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress());
                try {
                    HAConnection conn = new HAConnection(HAService.this, sc);
                    conn.start();
                    HAService.this.addConnection(conn);
                } catch (Exception e) {
                    log.error("new HAConnection exception", e);
                    sc.close();
                }
            }
        } else {
            log.warn("Unexpected ops in select" + k.readyOps());
        }
    }
    selected.clear();
}

该方法是标准的基于 NIO 的服务端程序实例,选择器每 1s 处理一次连接事件。连接事件就绪后,调用 ServerSocketChannel 的 accept() 方法创建 SocketChannel。然后为每一个连接创建一个 HAConnection 对象,该 HAConnection 将负责主从数据同步逻辑。

GroupTransferService实现原理

本节介绍 GroupTransferService 主从同步阻塞的实现,如果是主从同步模式,消息发送者将消息写入磁盘后,需要继续等待新数据被传输到从服务器,从服务器数据的复制是在另外一个线程 HAConnection 中拉取的,所以消息发送者在这里需要等待数据传输的结果。GroupTransferService 实现了该功能,该类的整体结构与同步刷盘实现类(CommitLog$GroupCommitService)类似,本节只关注该类的核心业务逻辑 doWaitTransfer 的实现,如代码清单 7-4 所示。

代码清单7-4 HAService$GroupTransferService#doWaitTransfer
private void doWaitTransfer () {
    synchronized (this.requestsRead) {
        if (!this.requestsRead.isEmpty()) {
            for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                for (int i = 0; !transferOK && i < 5; i++) {
                    this.notifyTransferObject.waitForRunning(1000);
                    transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                }
                if (!transferOK) {
                    log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                }
                req.wakeupCustomer(transferOK);
            }
            this.requestsRead.clear();
        }
    }
}

GroupTransferService 负责在主从同步复制结束后,通知由于等待同步结果而阻塞的消息发送者线程。判断主从同步是否完成的依据是从服务器中已成功复制的消息最大偏移量是否大于、等于消息生产者发送消息后消息服务端返回下一条消息的起始偏移量,如果是则表示主从同步复制已经完成,唤醒消息发送线程,否则等待 1s 再次判断,每一个任务在一批任务中循环判断 5 次。消息发送者返回有两种情况:等待超过 5s 或 GroupTransferService 通知主从复制完成。可以通过 syncFlushTimeout 来设置发送线程的等待超时时间。GroupTransferService 通知主从复制的实现如代码清单 7-5 所示。

代码清单7-5 HAService$GroupTransferService#notifyTransferSome
public void notifyTransferSome ( final long offset){
    for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
        boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
        if (ok) {
            this.groupTransferService.notifyTransferSome();
            break;
        } else {
            value = this.push2SlaveMaxOffset.get();
        }
    }
}

该方法在主服务器收到从服务器的拉取请求后被调用,表示从服务器当前已同步的偏移量,既然收到了从服务器的反馈信息,就需要唤醒某些消息发送者线程。如果从服务器收到的确认偏移量大于 push2SlaveMaxOffset,则更新 push2SlaveMaxOffset,然后唤醒 GroupTransferService 线程,最后各消息发送者线程再次判断本次发送的消息是否已经成功复制到了从服务器。

HAClient实现原理

HAClient 是主从同步从服务端的核心实现类,类图如图 7-3 所示。

image 2025 02 06 11 13 53 964
Figure 3. 图7-3 HAClient类图

HAClient 类的基本属性如下。

  • AtomicReference masterAddress:主服务器地址。

  • ByteBuffer reportOffset:从服务器向主服务器发起主从同步的拉取偏移量。

  • SocketChannel socketChannel:网络传输通道。

  • Selector selector:NIO事件选择器。

  • long lastWriteTimestamp:上一次写入消息的时间戳。

  • long currentReportedOffset:反馈从服务器当前的复制进度,即CommitLog文件的最大偏移量。

  • dispatchPostion:本次已处理读缓存区的指针。

  • ByteBuffer byteBufferRead:读缓存区,大小为4MB。

  • ByteBuffer byteBufferBackup:读缓存区备份,与BufferRead进行交换。

接下来从 run() 方法开始探讨HAClient的工作原理,如代码清单7-6所示。

private boolean connectMaster() throws ClosedChannelException {
    if (null == socketChannel) {
        String addr = this.masterAddress.get();
        if (addr != null) {
            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) {
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }
        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
        this.lastWriteTimestamp = System.currentTimeMillis();
    }
    return this.socketChannel != null;
}

第一步:从服务器连接主服务器。如果socketChannel为空,则尝试连接主服务器。如果主服务器地址为空,返回false。如果主服务器地址不为空,则建立到主服务器的TCP连接,然后注册OP_READ(网络读事件),初始化currentReportedOffset为CommitLog文件的最大偏移量、lastWriteTimestamp上次写入时间戳为当前时间戳,并返回true。在Broker启动时,如果Broker角色为从服务器,则读取Broker配置文件中的haMasterAddress属性并更新HAClient的masterAddrees,如果角色为从服务器,但haMasterAddress为空,启动Broker并不会报错,但不会执行主从同步复制,该方法最终返回是否成功连接上主服务器,如代码清单7-7所示。

代码清单7-7 HAClient#isTimeToReportOffset
private boolean isTimeToReportOffset() {
    long interval = HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
    boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();
    return needHeart;
}

第二步:判断是否需要向主服务器反馈当前待拉取消息的偏移量,主服务器与从服务器的高可用心跳发送时间间隔默认为5s,可通过配置 haSendHeartbeatInterval 来改变间隔时间。

代码清单7-8 HAClient#reportSlaveMaxOffset
private boolean reportSlaveMaxOffset(final long maxOffset) {
    this.reportOffset.position(0);
    this.reportOffset.limit(8);
    this.reportOffset.putLong(maxOffset);
    this.reportOffset.position(0);
    this.reportOffset.limit(8);

    for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
        try {
            this.socketChannel.write(this.reportOffset);
        } catch (IOException e) {
            log.error(this.getServiceName() + "reportSlaveMaxOffset this.socketChannel.write exception", e);
            return false;
        }
    }
    return !this.reportOffset.hasRemaining();
}

第三步:向主服务器反馈拉取消息偏移量。这里有两重含义,对于从服务器来说,是发送下次待拉取消息的偏移量,而对于主服务器来说,既可以认为是从服务器本次请求拉取的消息偏移量,也可以理解为从服务器的消息同步ACK确认消息,如代码清单 7-9 所示。

RocketMQ提供了一个基于NIO的网络写示例程序:首先将ByteBuffer的position设置为0,limit设置为待写入字节长度;然后调用putLong将待拉取消息的偏移量写入ByteBuffer,需要将ByteBuffer从写模式切换到读模式,这里的做法是手动将position设置为0,limit设置为可读长度,其实也可以直接调用ByteBuffer的flip()方法来切换ByteBuffer的读写状态。特别需要留意的是,调用网络通道的write()方法是在一个while循环中反复判断byteBuffer是否全部写入通道中,这是由于NIO是一个非阻塞I/O,调用一次write()方法不一定能将ByteBuffer可读字节全部写入。

代码清单7-9 HAClient#run
this.selector.select(1000);

第四步:进行事件选择,执行间隔时间为 1s,如代码清单 7-10 所示。

代码清单7-10 HAClient#processReadEvent
private boolean processReadEvent() {
    int readSizeZeroTimes = 0;
    while (this.byteBufferRead.hasRemaining()) {
        try {
            int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                lastWriteTimestamp = HAService.this.defaultMessageS
                tore.getSystemClock().now();
                readSizeZeroTimes = 0;
                boolean result = this.dispatchReadRequest();
                if (!result) {
                    log.error("HAClient, dispatchReadRequest error");
                    return false;
                }
            } else if (readSize == 0) {
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                log.info("HAClient, processReadEvent read socket < 0");
                return false;
            }
        } catch (IOException e) {
            log.info("HAClient, processReadEvent read socket exception", e);
            return false;
        }
    }
    return true;
}

第五步:处理网络读请求,即处理从主服务器传回的消息数据。RocketMQ 给出了一个处理网络读请求的 NIO 示例。循环判断 readByteBuffer 是否还有剩余空间,如果存在剩余空间,则调用 SocketChannel#read(ByteBuffer readByteBuffer)方法,将通道中的数据读入读缓存区。

  • 如果读取到的字节数大于 0,则重置读取到0字节的次数,并更新最后一次写入消息的时间戳(lastWriteTimestamp),然后调用 dispatchReadRequest 方法将读取到的所有消息全部追加到消息内存映射文件中,再次反馈拉取进度给主服务器。

  • 如果连续3次从网络通道读取到 0 个字节,则结束本次读任务,返回 true。

  • 如果读取到的字节数小于 0 或发生 I/O 异常,则返回 false。

HAClient 线程反复执行上述 5 个步骤完成主从同步复制功能。

HAConnection实现原理

主服务器在收到从服务器的连接请求后,会将主从服务器的连接 SocketChannel 封装成 HAConnection 对象,实现主服务器与从服务器的读写操作,其类图如图 7-4 所示。

image 2025 02 06 11 24 51 822
Figure 4. 图7-4 HAConnection类图

HAConnection 类属性如下。

  • HAService haService:HAService对象。

  • SocketChannel socketChannel:网络socket通道。

  • String clientAddr:客户端连接地址。

  • WriteSocketService writeSocketService:服务端向从服务器写数据服务类。

  • ReadSocketService readSocketService:服务端从从服务器读数据服务类。

  • long slaveRequestOffset:从服务器请求拉取消息的偏移量。

  • long slaveAckOffset:从服务器反馈已拉取完成的消息偏移量。

HAConnection 的网络读请求由其内部类 ReadSocketService 线程来实现,其类图如图 7-5 所示。

image 2025 02 06 11 25 49 003
Figure 5. 图7-5 HAConnection$ReadSocketService类图

类属性如下。

1)READ_MAX_BUFFER_SIZE:网络读缓存区大小,默认为1MB。 2)Selector selector:NIO网络事件选择器。 3)SocketChannel socketChannel:网络通道,用于读写的socket通道。 4)ByteBuffer byteBufferRead:网络读写缓存区,默认为1MB。 5)int processPosition:byteBuffer当前处理指针。 6)volatile long lastReadTimestamp:上次读取数据的时间戳。

通过观察其 run() 方法,每隔 1s 处理一次读就绪事件,每次读请求调用其 processRead-Event 来解析从服务器的拉取请求。接下来详细剖析处理读请求的实现细节。

第一步:如果 byteBufferRead 没有剩余空间,说明该 position==limit==capacity,调用 byteBufferRead.flip() 方法,产生的效果为 position=0、limit=capacity 并设置 processPosition 为 0,表示从头开始处理,其实这里调用 byteBuffer.clear() 方法会更加容易理解,如代码清单 7-11 所示。

代码清单7-11 HAConnection$ReadSocketService#processReadEvent
int readSizeZeroTimes = 0;
if (!this.byteBufferRead.hasRemaining()) {
    this.byteBufferRead.flip();
    this.processPostion = 0;
}

第二步:NIO 网络读的常规方法,一般使用循环的方式进行读写,直到 byteBuffer 中没有剩余的空间,如代码清单 7-12 所示。

代码清单7-12 HAConnection$ReadSocketService#processReadEvent
while (this.byteBufferRead.hasRemaining()) {
    // 处理网络读
    int readSize = this.socketChannel.read(this.byteBufferRead);
}

第三步:如果读取的字节大于0并且本次读取到的内容大于等于 8,表明收到了从服务器一条拉取消息的请求。由于有新的从服务器反馈拉取偏移量,服务端会通知由于同步等待主从复制结果而阻塞的消息发送者线程,如代码清单 7-13 所示。

代码清单7-13 HAConnection$ReadSocketService#processReadEvent
if (readSize > 0) {
    readSizeZeroTimes = 0;
    this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();

    if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
        int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
        long readOffset = this.byteBufferRead.getLong(pos - 8);
        this.processPostion = pos;
        HAConnection.this.slaveAckOffset = readOffset;
        if (HAConnection.this.slaveRequestOffset < 0) {
            HAConnection.this.slaveRequestOffset = readOffset;
            log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
        }
        HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
    }
}

第四步:如果读取到的字节数等于 0,则重复执行三次读请求,否则结束本次读请求处理。如果读取到的字节数小于 0,表示连接处于半关闭状态,返回 false,意味着消息服务器将关闭该链接,如代码清单 7-14 所示。

代码清单7-14 HAConnection$ReadSocketService#processReadEvent
if (readSize == 0) {
    if (++readSizeZeroTimes >= 3){
        break;
    }
} else {
    log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
    return false;
}

HAConnction 的读请求就介绍到这里,其网络写请求由内部类 WriteSocketService 线程来实现,类图如图 7-6 所示。

image 2025 02 06 11 34 59 741
Figure 6. 图7-5 HAConnection$ReadSocketService类图

类属性如下。

  • Selector selector:NIO 网络事件选择器。

  • SocketChannel socketChannel:网络 socket 通道。

  • int headerSize:消息头长度,即消息物理偏移量+消息长度。

  • long nextTransferFromWhere:下一次传输的物理偏移量。

  • SelectMappedBufferResult selectMappedBufferResult:根据偏移量查找消息的结果。

  • boolean lastWriteOver:上一次数据是否传输完毕。

  • long lastWriteTimestamp:上次写入消息的时间戳。

接下来分析一下实现原理,重点关注run()方法,如代码清单7-15所示。

代码清单7-15 HAConnection$WriteSocketService#run
if (-1 == HAConnection.this.slaveRequestOffset) {
    Thread.sleep(10);
    continue;
}

第一步:如果 slaveRequestOffset 等于-1,说明主服务器还未收到从服务器的拉取请求,则放弃本次事件处理。slaveRequestOffset在收到从服务器拉取请求时更新,如代码清单7-16所示。

代码清单7-16 HAConnection$WriteSocketService#run
if (-1 == this.nextTransferFromWhere) {
    if (0 == HAConnection.this.slaveRequestOffset) {
        long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
        masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                .getMapedFileSizeCommitLog());
        if (masterOffset < 0) {
            masterOffset = 0;
        }
        this.nextTransferFromWhere = masterOffset;
    } else {
        this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
    }
}

第二步:如果nextTransferFromWhere为-1,表示初次进行数据传输,计算待传输的物理偏移量,如果slaveRequestOffset为0,则从当前CommitLog文件最大偏移量开始传输,否则根据从服务器的拉取请求偏移量开始传输,如代码清单7-17所示。

代码清单7-17 HAConnection$WriteSocketService#run
if (this.lastWriteOver) {
    long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
    if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) {
        this.byteBufferHeader.position(0);
        this.byteBufferHeader.limit(headerSize);
        this.byteBufferHeader.putLong(this.nextTransferFromWhere);
        this.byteBufferHeader.putInt(0);
        this.byteBufferHeader.flip();
        this.lastWriteOver = this.transferData();
        if (!this.lastWriteOver) continue;
    }
} else {
    this.lastWriteOver = this.transferData();
    if (!this.lastWriteOver) continue;
}

第三步:判断上次写事件是否已将信息全部写入客户端,如代码清单7-18所示。 1)如果已全部写入,且当前系统时间与上次最后写入的时间间隔大于高可用心跳检测时间,则发送一个心跳包,心跳包的长度为12个字节(从服务器待拉取偏移量+size),消息长度默认为0,避免长连接由于空闲被关闭。高可用心跳包发送间隔通过haSendHeartbeatInterval设置,默认值为5s。 2)如果上次数据未写完,则先传输上一次的数据,如果消息还是未全部传输,则结束此次事件处理。

代码清单7-18 HAConnection$WriteSocketService#run
SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
    int size = selectResult.getSize();
    if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
            .getHaTransferBatchSize()) {
        size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
    }
    long thisOffset = this.nextTransferFromWhere;
    this.nextTransferFromWhere += size;
    selectResult.getByteBuffer().limit(size);
    this.selectMappedBufferResult = selectResult;
    this.byteBufferHeader.position(0);
    this.byteBufferHeader.limit(headerSize);
    this.byteBufferHeader.putLong(thisOffset);
    this.byteBufferHeader.putInt(size);
    this.byteBufferHeader.flip();
    this.lastWriteOver = this.transferData();
} else {
    HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}

第四步:传输消息到从服务器。 1)根据消息从服务器请求的待拉取消息偏移量,查找该偏移量之后所有的可读消息,如果未查到匹配的消息,通知所有等待线程继续等待100ms。 2)如果匹配到消息,且查找到的消息总长度大于配置高可用传输一次同步任务的最大传输字节数,则通过设置ByteBuffer的limit来控制只传输指定长度的字节,这就意味着高可用客户端收到的消息会包含不完整的消息。高可用一批次传输消息最大字节通过haTransferBatchSize设置,默认值为32KB。

高可用服务端消息的传输一直以上述步骤循环运行,每次事件处理完成后等待1s。

RocketMQ 高可用主从同步机制就讲解到这里,其主要交互流程如图 7-7 所示。

image 2025 02 06 11 46 01 161
Figure 7. 图7-7 RocketMQ HA交互类图