RocketMQ DLedger主从切换之 Leader 选主
本节将介绍RocketMQ是如何实现Raft协议的,其代码并不在RocketMQ工程中,而是在openmessaging标准中。DLedger实现Raft协议选主模块的类图如图9-3所示。

下面逐一介绍主模块核心类及核心属性。
1)DLedgerConfig:主从切换模块相关的配置信息。 2)MemberState:节点状态机,即Raft协议中Follower、Candidate、Leader三种状态的状态机实现。 3)DLedgerClientProtocol:DLedger客户端协议,主要定义如下3个方法。 CompletableFuture< GetEntriesResponse> get(GetEntriesRequest request):客户端从服务器获取日志条目(获取数据)。 CompletableFuture< AppendEntryResponse> append(AppendEntryRequest request):客户端向服务器追加日志(存储数据)。 CompletableFuture< MetadataResponse> metadata(MetadataRequest request):获取元数据。 4)DLedgerProtocol:DLedger客户端协议,主要定义如下4个方法。
-
CompletableFuture< VoteResponse> vote(VoteRequest request):发起投票请求。
-
CompletableFuture< HeartBeatResponse> heartBeat(HeartBeatRequest request):Leader节点向从节点发送心跳包。
-
CompletableFuture< PullEntriesResponse> pull(PullEntriesRequest request):拉取日志条目。
-
CompletableFuture< PushEntryResponse> push(PushEntryRequest request):推送日志条目,用于日志传播。
5)DLedgerClientProtocolHandler:DLedger客户端协议处理器。 6)DLedgerProtocolHander:DLedger服务端协议处理器。 7)DLedgerRpcService:DLedger节点之前的网络通信,默认基于Netty实现,默认实现类为DLedgerRpcNettyService。 8)DLedgerLeaderElector:基于Raft协议的Leader选举类。 9)DLedgerServer:基于Raft协议的集群内节点的封装类。
本节将重点介绍RocketMQ如何实现Raft协议的Leader选举机制,下面重点介绍DLedgerLeaderElector类。
DLedgerLeaderElector 核心类及核心属性
DLedgerLeaderElector的类图如图9-4所示。 下面逐一介绍上述核心类及核心属性。
1)Random random:随机数生成器,对应Raft协议中选举超时时间,是一个随机数。 2)DLedgerConfig dLedgerConfig:配置参数。 3)MemberState memberState:节点状态机。 4)DLedgerRpcService dLedgerRpcService:RPC服务,实现向集群内的节点发送心跳包、投票的RPC。

5)long lastLeaderHeartBeatTime:上次收到心跳包的时间戳。 6)long lastSendHeartBeatTime:上次发送心跳包的时间戳。 7)long lastSuccHeartBeatTime:上次成功收到心跳包的时间戳。 8)int heartBeatTimeIntervalMs:一个心跳包的周期,默认为2s。 9)int maxHeartBeatLeak:允许最大的n 个心跳周期内未收到心跳包,状态为Follower的节点只有超过maxHeartBeatLeak * heartBeatTimeIntervalMs的时间内未收到主节点的心跳包,才会重新进入Candidate状态,进行下一轮选举。 10)long nextTimeToRequestVote:发送下一个心跳包的时间戳。 11)boolean needIncreaseTermImmediately:是否应该立即发起投票。 12)int minVoteIntervalMs:最小的发送投票间隔时间,默认为300ms。 13)int maxVoteIntervalMs:最大的发送投票间隔时间,默认为1000ms。 14)List< RoleChangeHandler> roleChangeHandlers:注册的节点状态处理器,通过addRoleChangeHandler方法添加。 15)long lastVoteCost:上一次投票的开销。 16)StateMaintainer stateMaintainer:状态机管理器。
选举状态管理器初始化
通过DLedgerLeaderElector的startup()方法启动状态管理机,如代码清单9-1所示。
public void startup() {
stateMaintainer.start();
for (RoleChangeHandler roleChangeHandler: roleChangeHandlers){
roleChangeHandler.startup();
}
}
实现关键点如下。
1)stateMaintainer是Leader选举内部维护的状态机,即维护节点状态在Follower、Candidate、Leader之间转换,需要先调用其start()方法启动状态机。 2)依次启动注册的角色转换监听器,即内部状态机的状态发生变更后的事件监听器,是Leader选举的功能扩展点。
StateMaintainer的父类为ShutdownAbleThread,继承自Thread,故调用其start()方法最终会调用run()方法,如代码清单9-2所示。
public void run() {
while (running.get()) {
try {
doWork();
} catch (Throwable t) {//省略日志
}
}
latch.countDown();
}
从代码清单9-2可知,StateMaintainer状态机的实现要点就是“无限死循环”调用doWork()方法,直到该状态机被关闭。doWork()方法在ShutdownAbleThread被声明为抽象方法,具体由各个子类实现,我们将目光投向StateMaintainer的doWork()方法,如代码清单9-3所示。
public void doWork() {
try {
if (DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()) {
DLedgerLeaderElector.this.refreshIntervals(dLedgerConfig);
DLedgerLeaderElector.this.maintainState();
}
sleep(10);
} catch (Throwable t) {
DLedgerLeaderElector.logger.error("Error in heartbeat", t);
}
}
如果当前节点参与Leader选举,则调用maintainState()方法驱动状态机,并且每一次驱动状态机后休息10ms,如代码清单9-4所示。
private void maintainState() throws Exception {
if (memberState.isLeader()) {
maintainAsLeader();
} else if (memberState.isFollower()) {
maintainAsFollower();
} else {
maintainAsCandidate();
}
}
状态机的驱动实现思路比较简单,就是根据状态机当前状态对应的方法,在该状态下检测状态机是否满足状态变更的条件,如果满足则变更状态。接下来对上述3个方法进行详细介绍,帮助读者理解节点在各个状态时需要处理的核心逻辑。为便于理解,先给出在3个状态下需要处理的核心逻辑点。
1)Leader:领导者、主节点,该状态下需要定时向从节点发送心跳包,用于传播数据、确保其领导地位。 2)Follower:从节点,该状态下会开启定时器,尝试进入Candidate状态,以便发起投票选举,一旦收到主节点的心跳包,则重置定时器。 3)Candidate:候选者,该状态下的节点会发起投票,尝试选择自己为主节点,选举成功后,不会存在该状态下的节点。
选举状态机状态流转
本节将重点梳理状态机的状态转换逻辑,首先我们追溯MemberState的初始化,发现其初始状态为Candidate。接下来深入学习maintainAsCandidate()方法,以此探究实现原理。
maintainAsCandidate()方法
如代码清单9-5所示,根据状态机的流转代码可知,当集群中节点的状态为Candidate时会执行该方法,处于该状态的节点会发起投票请求。
if (System.currentTimeMillis() < nextTimeToRequestVote && !needIncreaseTermImmediately) {
return;
}
long term;
long ledgerEndTerm;
long ledgerEndIndex;
第一步:如代码清单9-6所示,先介绍几个变量的含义。
1)long nextTimeToRequestVote:下一次可发起投票的时间,如果当前时间小于该值,说明计时器未过期,此时无须发起投票。 2)long needIncreaseTermImmediately:是否应该立即发起投票。如果为true,则忽略计时器,该值默认为false。作用是在从节点收到主节点的心跳包,并且当前状态机的轮次大于主节点轮次(说明集群中Leader的投票轮次小于从节点的轮次)时,立即发起新的投票请求。 3)long term:投票轮次。 4)long ledgerEndTerm:Leader节点当前的投票轮次。 5)long ledgerEndIndex:当前日志的最大序列,即下一条日志的开始index,在9.6节会详细介绍。
synchronized (memberState) {
if (!memberState.isCandidate()) {
return;
}
if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) {
long prevTerm = memberState.currTerm();
term = memberState.nextTerm();
lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
} else {
term = memberState.currTerm();
}
ledgerEndIndex = memberState.getLedgerEndIndex();
ledgerEndTerm = memberState.getLedgerEndTerm();
}
第二步:初始化team、ledgerEndIndex、ledgerEndTerm属性,如代码清单9-7所示,其实现关键点如下。
投票轮次的初始化机制:如果上一次的投票结果为WAIT_TO_VOTE_NEXT(等待下一轮投票)或应该立即发起投票,则通过状态机获取新一轮投票的序号,默认在当前轮次递增1,并将lastParseResult更新为WAIT_TO_REVOTE(等待投票)。
如果上一次的投票结果不是WAIT_TO_VOTE_NEXT,则投票轮次依然为状态机内部维护的投票轮次。
if (needIncreaseTermImmediately) {
nextTimeToRequestVote = getNextTimeToRequestVote();
needIncreaseTermImmediately = false;
return;
}
private long getNextTimeToRequestVote () {
return System.currentTimeMillis() + lastVoteCost + minVoteIntervalMs + random.nextInt(maxVoteIntervalMs - minVoteIntervalMs);
}
第三步:如果needIncreaseTermImmediately为true,则重置该标记位为false,并重新设置下一次投票超时时间,其实现逻辑为当前时间戳+上次投票的开销+最小投票间隔之间的随机值,这里是Raft协议的一个关键点,即每个节点的投票超时时间引入了随机值,如代码清单9-8所示。
final List<CompletableFuture<VoteResponse>> quorumVoteResponses = voteForQuorumResponses(term, ledgerEndTerm, ledgerEndIndex);
第四步:向集群内的其他节点发起投票请求,并等待各个节点的响应结果,如代码清单9-9所示。在这里我们先将其当作黑盒,具体的投票请求与结果响应将在下文重点阐述。
final AtomicLong knownMaxTermInGroup = new AtomicLong(-1);
final AtomicInteger allNum = new AtomicInteger(0);
final AtomicInteger validNum = new AtomicInteger(0);
final AtomicInteger acceptedNum = new AtomicInteger(0);
final AtomicInteger notReadyTermNum = new AtomicInteger(0);
final AtomicInteger biggerLedgerNum = new AtomicInteger(0);
final AtomicBoolean alreadyHasLeader = new AtomicBoolean(false);
如代码清单9-10所示,在进行投票结果仲裁之前,先介绍几个局部变量的含义。
-
knownMaxTermInGroup:已知的最大投票轮次。
-
allNum:所有投票数。
-
validNum:有效投票数。
-
acceptedNum:赞成票数量。
-
notReadyTermNum:未准备投票的节点数量,如果对端节点的投票轮次小于发起投票的轮次,则认为对端未准备好,对端节点使用本轮次进入Candidate状态。
-
biggerLedgerNum:发起投票的节点的ledgerEndTerm小于对端节点的个数。
-
alreadyHasLeader:是否已经存在Leader。
上述变量值都来自当前节点向集群内其他节点发送投票请求的响应结果,即投票与响应投票。读者可以思考一下投票请求的响应逻辑,为后续进行深入研究打下一定的基础,即先思考再探究,事半功倍。 |
CountDownLatch voteLatch = new CountDownLatch(1);
for (CompletableFuture<VoteResponse> future : quorumVoteResponses) {
future.whenComplete((VoteResponse x, Throwable ex) -> { // 具体的业务逻辑,即当请求得到对端的响应结果后的回调方法
});
}
try {
voteLatch.await(3000 + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS);
} catch (Throwable ignore) {
}
第五步:在第四步异步向集群内的各个节点发送投票请求,接下来需要同步等待所有的响应结果。这里RocketMQ向我们展示了一种非常优雅的编程技巧,在收到对端的响应结果后触发CountDownLatch与Future的whenComplete方法。在业务处理过程中,如果条件满足则调用CountDownLatch的countDown方法,唤醒await()方法,使之接受全部响应结果后执行后续逻辑。
接下来继续看收到请求结果后的具体处理逻辑,如代码清单9-11所示。
if (x.getVoteResult() != VoteResponse.RESULT.UNKNOWN) {
validNum.incrementAndGet();
}
synchronized (knownMaxTermInGroup) {
switch (x.getVoteResult()) {
case ACCEPT:
acceptedNum.incrementAndGet();
break;
case REJECT_ALREADY_VOTED:
break;
case REJECT_ALREADY__HAS_LEADER:
alreadyHasLeader.compareAndSet(false, true)
;
break;
case REJECT_TERM_SMALL_THAN_LEDGER:
case REJECT_EXPIRED_VOTE_TERM:
if (x.getTerm() > knownMaxTermInGroup.get()
) {
knownMaxTermInGroup.set(x.getTerm());
}
break;
case REJECT_EXPIRED_LEDGER_TERM:
case REJECT_SMALL_LEDGER_END_INDEX:
biggerLedgerNum.incrementAndGet();
break;
case REJECT_TERM_NOT_READY:
notReadyTermNum.incrementAndGet();
break;
default:
break;
}
}
第六步:统计投票结果,后续会根据投票结果决定是否可以成为Leader,从而决定当前节点的状态,如代码清单9-12所示,具体实现逻辑如下。
1)VoteResponse.RESULT.UNKNOWN:如果投票结果不是UNKNOWN,则有效票数(validNum)加1。 2)ACCEPT:赞成票(acceptedNum)加1,只有得到的赞成票超过集群节点数量的一半才能成为Leader。 3)REJECT_ALREADY_VOTED:拒绝票,原因是已经投给了其他节点。 4)REJECT_ALREADY_HAS_LEADER:拒绝票,原因是集群中已经存在Leaer节点了。alreadyHasLeader设置为true,无须再判断其他投票结果了,结束本轮投票。 5)REJECT_TERM_SMALL_THAN_LEDGER:拒绝票,原因是自己维护的term小于远端维护的ledgerEndTerm。如果对端的team大于自己的team,需要记录对端最大的投票轮次,以便更新自己的投票轮次。 6)REJECT_EXPIRED_VOTE_TERM:拒绝票,原因是自己维护的投票轮次小于远端维护的投票轮次,并且更新自己维护的投票轮次。 7)REJECT_EXPIRED_LEDGER_TERM:拒绝票,原因是自己维护的ledgerTerm小于对端维护的ledgerTerm,此种情况下需要增加计数器biggerLedgerNum的值。 8)REJECT_SMALL_LEDGER_END_INDEX:拒绝票,原因是对端的ledgerTeam与自己维护的ledgerTeam相等,但自己维护的dedgerEndIndex小于对端维护的值,这种情况下需要增加biggerLedgerNum计数器的值。 9)REJECT_TERM_NOT_READY:拒绝票,原因是对端的投票轮次小于自己的投票轮次,即对端还未准备好投票。此时对端节点使用自己的投票轮次进入Candidate状态。
lastVoteCost = DLedgerUtils.elapsed(startVoteTimeMs);
VoteResponse.ParseResult parseResult;
if (knownMaxTermInGroup.get() > term) {
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote();
changeRoleToCandidate(knownMaxTermInGroup.get());
} else if (alreadyHasLeader.get()) {
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote() + heartBeatTimeIntervalMs * maxHeartBeatLeak;
} else if (!memberState.isQuorum(validNum.get())) {
parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
nextTimeToRequestVote = getNextTimeToRequestVote();
} else if (memberState.isQuorum(acceptedNum.get())) {
parseResult = VoteResponse.ParseResult.PASSED;
} else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY;
} else if (memberState.isQuorum(acceptedNum.get() + biggerLedgerNum.get())) {
parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
nextTimeToRequestVote = getNextTimeToRequestVote();
} else {
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote();
}
lastParseResult = parseResult;
if (parseResult == VoteResponse.ParseResult.PASSED) {
changeRoleToLeader(term);
}
第七步:根据投票结果进行仲裁,从而驱动状态机,下面对代码清单9-12进行一个详细的解读。
1)如果对端的投票轮次大于当前节点维护的投票轮次,则先重置投票计时器,然后在定时器到期后使用对端的投票轮次重新进入Candidate状态。 2)如果集群内已经存在Leader节点,当前节点将继续保持Candidate状态,重置计时器,但这个计时器还需要增加heartBeatTimeIntervalMs*maxHeartBeatLeak,其中heartBeatTimeIntervalMs为一次心跳间隔时间,maxHeartBeatLeak为允许丢失的最大心跳包。增加这个时间是因为集群内既然已经存在Leader节点了,就会在一个心跳周期内发送心跳包,从节点在收到心跳包后会重置定时器,即阻止Follower节点进入Candidate状态。这样做的目的是在指定时间内收到Leader节点的心跳包,从而驱动当前节点的状态由Candidate向Follower转换。 3)如果收到的有效票数未超过半数,则重置计时器并等待重新投票,注意当前状态为WAIT_TO_REVOTE,该状态下的特征是下次投票时不增加投票轮次。 4)如果得到的赞同票超过半数,则成为Leader节点,调用changeRoleToLeader方法驱动状态机向Leader状态转换。 5)如果得到的赞成票加上未准备投票的节点数超过半数,则立即发起投票,故其结果为REVOTE_IMMEDIATELY。 6)如果得到的赞成票加上对端维护的ledgerEndIndex超过半数,则重置计时器,继续本轮选举。 7)maintainAsCandidate()方法的流程就介绍到这里了,下面介绍maintainAsLeader()方法。
maintainAsLeader()方法
经过maintainAsCandidate投票选举被其他节点选举为Leader后,在该状态下会执行maintainAsLeader()方法,其他节点的状态还是Candidate,并在计时器过期后,又尝试发起选举。接下来重点分析成为Leader节点后,该节点会做些什么,如代码清单9-13所示。
private void maintainAsLeader() throws Exception {
if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) { // @1
long term;
String leaderId;
synchronized (memberState) {
if (!memberState.isLeader()) { // @2
return;
}
term = memberState.currTerm();
leaderId = memberState.getLeaderId();
lastSendHeartBeatTime = System.current
TimeMillis(); //@3
}
sendHeartbeats(term, leaderId); //@4
}
}
Leader 状态的节点主要按固定频率向集群内的其他节点发送心跳包,实现细节如下。
-
@1:如果当前时间与上一次发送心跳包的间隔时间大于一个心跳包周期(默认为2s),则进入心跳包发送处理逻辑,否则忽略。
-
@2:如果当前状态机的状态已经不是Leader,则忽略。
-
@3:记录本次发送心跳包的时间戳。
-
@4:调用 sendHeartbeats() 方法向集群内的从节点发送心跳包,具体逻辑将在9.3.5节详细介绍。
maintainAsFollower()方法
Candidate状态的节点在收到Leader节点发送的心跳包后,状态变更为Follower,我们先来看在Follower状态下,节点会做些什么,如代码清单9-14所示。
private void maintainAsFollower() {
if (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > 2 * heartBeatTimeIntervalMs) {
synchronized (memberState) {
if (memberState.isFollower() && (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs)) {
changeRoleToCandidate(memberState.currTerm());
}
}
}
}
如果节点在maxHeartBeatLeak个心跳包(默认为3个)周期内未收到心跳包,则将状态变更为Candidate。从这里也不得不佩服RocketMQ在性能方面如此追求极致,即在不加锁的情况下判断是否超过了2个心跳包周期,减少加锁次数,提高性能。 上面3个方法就是状态机在当前状态下执行的处理逻辑,主要是结合当前实际的运行情况将状态机进行驱动,例如调用changeRoleToCandidate()方法将自身状态变更为Candidate,调用changeRoleToLeader()方法将状态变更为Leader,调用changeRoleToFollower()方法将状态变更为Follower。这3个方法的实现类似,接下来以changeRoleToLeader()方法为例进行讲解。
changeRoleToLeader()方法
当状态机从Candidate状态变更为Leader节点后会调用该方法,即当处于Candidate状态的节点在得到集群内超过半数节点的支持后将进入该状态,我们来看该方法的实现细节,如代码清单9-15所示。
public void changeRoleToLeader(long term) {
synchronized (memberState) {
if (memberState.currTerm() == term) {
memberState.changeToLeader(term);
lastSendHeartBeatTime = -1;
handleRoleChange(term, MemberState.Role.LEADER);
logger.info("[{}] [ChangeRoleTo Leader] from term: {} and currTerm: {}", memberState.getSelfId(), term, memberState.currTerm());
} else {
logger.warn("[{}] skip to be the leader in term: {}, but currTerm is: {}", memberState.getSel fId(), term, memberState.currTerm());
}
}
}
首先更新状态机(MemberState)的角色为Leader,并设置leaderId为当前节点的ID,然后调用handleRoleChange方法触发角色状态转换事件,从而执行扩展点的逻辑代码。
选举状态机状态的流转就介绍到这里,在上面的流程中我们忽略了两个重要的过程:发起投票请求与投票请求响应、发送心跳包与心跳包响应,接下来重点介绍这两个过程。
发送投票请求与处理投票请求
节点的状态为Candidate时会向集群内的其他节点发起投票请求(个人认为理解为拉票更好),向对方询问是否愿意选举“我”为Leader,对端节点会根据自己的情况对其投赞成票或拒绝票,如果投拒绝票,还会给出拒绝的原因,具体由voteForQuorumResponses()、handleVote()这两个方法实现,接下来我们分别对这两个方法进行详细分析。
voteForQuorumResponses()方法
当节点状态为Candidate时会向集群内的其他节点发起投票请求,voteForQuorumResponses()方法是发送请求的具体实现,如代码清单9-16所示。
private List<CompletableFuture<VoteResponse>> voteForQuorumResponses(long term, long ledgerEndTerm, long ledgerEndIndex) throws Exception {
List<CompletableFuture<VoteResponse>> responses = new ArrayList<>();
for (String id : memberState.getPeerMap().keySet()) {
VoteRequest voteRequest = new VoteRequest();
voteRequest.setGroup(memberState.getGroup());
voteRequest.setLedgerEndIndex(ledgerEndIndex);
voteRequest.setLedgerEndTerm(ledgerEndTerm);
voteRequest.setLeaderId(memberState.getSelfId());
voteRequest.setTerm(term);
voteRequest.setRemoteId(id);
CompletableFuture<VoteResponse> voteResponse;
if (memberState.getSelfId().equals(id)) {
voteResponse = handleVote(voteRequest, true);
} else {
voteResponse = dLedgerRpcService.vote(voteRequest);
}
responses.add(voteResponse);
}
return responses;
}
各参数含义如下。
-
long term:发起投票节点当前维护的投票轮次。
-
long ledgerEndTerm:发起投票节点当前维护的最大投票轮次。
-
long ledgerEndIndex:发起投票节点维护的最大日志条目索引。
遍历集群内的所有节点,依次构建投票请求并通过网络异步发送到对端节点,发起投票节点会默认为自己投上一票,投票逻辑被封装在 handleVote() 方法中。
handleVote()方法
因为一个节点可能会收到多个节点的“拉票”请求,存在并发问题,所以需要引入synchronized机制,锁定状态机memberState对象。接下来我们详细了解其实现逻辑,如代码清单9-17所示。
if (request.getTerm() < memberState.currTerm()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_VOTE_TERM));
} else if (request.getTerm() == memberState.currTerm()) {
if (memberState.currVoteFor() == null) {
} else if (memberState.currVoteFor().equals(request.getLeaderId())) {
} else {
if (memberState.getLeaderId() != null) {
return CompletableFuture.completedFuture(new VoteResponse(reque st).term(memberState.currTerm()).
voteResult(VoteResponse.RESULT.REJECT_ALREADY__HAS_LEADER));
} else {
return CompletableFuture.completedFuture(new VoteResponse(reque st).term(memberState.currTerm()).
voteResult(VoteResponse.RESULT.REJECT_ALREADY_VOTED));
}
}
} else {
changeRoleToCandidate(request.getTerm());
needIncreaseTermImmediately = true;
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_NOT_READY));
}
第一步:如代码清单9-18所示,根据发起投票节点、当前响应节点维护的投票轮次进行投票仲裁,分如下3种情况。
1)发起投票节点的投票轮次小于当前节点的投票轮次:投拒绝票,也就是说在Raft协议中,term越大,越有话语权。 2)发起投票节点的投票轮次等于当前节点的投票轮次:说明两者都处在同一个投票轮次中,地位平等,接下来看该节点是否已经投过票。
-
如果未投票或已投票给请求节点,则继续下面的逻辑。
-
如果该节点已存在Leader节点,则拒绝并告知已存在Leader节点。
-
如果该节点还未有Leader节点,但已经投了其他节点的票,则拒绝请求节点,并告知已投票。
3)发起投票节点的投票轮次大于当前节点的投票轮次:拒绝发起投票节点的投票请求,并告知对方自己还未准备投票,会使用发起投票节点的投票轮次立即进入Candidate状态。
if (request.getLedgerEndTerm() < memberState.getLedgerEndTerm()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_LEDGER_TERM));
} else if (request.getLedgerEndTerm() == memberState.getLedgerEndTerm() && request.getLedgerEndIndex() < memberState.getLedgerEndIndex()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(
VoteResponse.RESULT.REJECT_SMALL_LEDGER_END_INDEX)
);
}
if (request.getTerm() < memberState.getLedgerEndTerm()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.getLedgerEndTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_SMALL_THAN_LEDGER));
}
第二步:如代码清单9-19所示,根据发起投票节点、当前响应节点维护的ledgerEndTerm进行投票仲裁,分如下3种情况。
1)如果发起投票节点的ledgerEndTerm小于当前响应节点的ledgerEndTerm则拒绝,原因是发起投票节点的日志复制进度比当前节点低,这种情况是不能成为主节点的,否则会造成数据丢失。 2)如果发起投票节点的ledgerEndTerm与当前响应节点维护的ledgerEndTerm相等,但是ledgerEndIndex比当前节点小,则拒绝,原因与上一条相同。 3)如果发起投票节点的投票轮次小于ledgerEndTerm,则以同样的理由拒绝。
memberState.setCurrVoteFor(request.getLeaderId());
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.ACCEPT));
第三步:经过层层筛选,将宝贵的赞成票投给发起投票的节点,完成整个投票过程。
发送心跳包与处理心跳包
经过几轮投票,其中一个节点会被推举出来成为Leader节点。Leader节点为了维持其领导地位,会定时向从节点发送心跳包,接下来我们重点看心跳包的发送与响应。
sendHeartbeats()方法
发送心跳包由sendHeartbeats()方法实现,如代码清单9-20所示。
final AtomicInteger allNum = new AtomicInteger(1);
final AtomicInteger succNum = new AtomicInteger(1);
final AtomicInteger notReadyNum = new AtomicInteger(0);
final AtomicLong maxTerm = new AtomicLong(-1);
final AtomicBoolean inconsistLeader = new AtomicBoolean(false);
final CountDownLatch beatLatch = new CountDownLatch(1);
long startHeartbeatTimeMs = System.currentTimeMillis();
如代码清单 9-21 所示,先介绍几个局部变量的含义。
1)AtomicInteger allNum:集群内节点的个数。 2)AtomicInteger succNum:收到成功响应的节点个数。 3)AtomicInteger notReadyNum:收到对端没有准备好反馈的节点个数。 4)AtomicLong maxTerm:当前集群中各个节点维护的最大的投票轮次。 5)AtomicBoolean inconsistLeader:是否存在Leader节点不一致。 6)CountDownLatch beatLatch:用于等待异步请求结果。 7)long startHeartbeatTimeMs:本次心跳包开始发送的时间戳。
for (String id : memberState.getPeerMap().keySet()) {
if (memberState.getSelfId().equals(id)) {
continue;
}
HeartBeatRequest heartBeatRequest = new HeartBeatRequest();
heartBeatRequest.setGroup(memberState.getGroup());
heartBeatRequest.setLocalId(memberState.getSelfId());
heartBeatRequest.setRemoteId(id);
heartBeatRequest.setLeaderId(leaderId);
heartBeatRequest.setTerm(term);
CompletableFuture<HeartBeatResponse> future = dLedgerRpcService.heartBeat(heartBeatRequest); // 省略异步结果事件处理逻辑}
}
第一步:遍历集群中所有的节点,构建心跳数据包并异步向集群内的从节点发送心跳包,心跳包中主要包含Raft复制组名、当前节点ID、远程节点ID、当前集群中的leaderId、当前节点维护的投票轮次,如代码清单9-22所示。
future.whenComplete((HeartBeatResponse x, Throwable ex) -> {
try {
switch (DLedgerResponseCode.valueOf(x.getCode())) {
case SUCCESS:
succNum.incrementAndGet();
break;
case EXPIRED_TERM:
maxTerm.set(x.getTerm());
break;
case INCONSISTENT_LEADER:
inconsistLeader.compareAndSet(false, true);
break;
case TERM_NOT_READY:
notReadyNum.incrementAndGet();
break;
default:
break;
}
if (memberState.isQuorum(succNum.get()) || memberState.isQuorum(succNum.get() + notReadyNum.get())) {
beatLatch.countDown();
}
} finally {
allNum.incrementAndGet();
if (allNum.get() == memberState.peerSize()) {
beatLatch.countDown();
}
}
});
第二步:当收到一个节点的响应结果后触发回调函数,统计响应结果,先介绍一下对端节点的返回结果。
1)SUCCESS:心跳包成功响应。 2)EXPIRED_TERM:节点的投票轮次小于从节点的投票轮次。 3)INCONSISTENT_LEADER:从节点已经有了新的主节点。 4)TERM_NOT_READY:从节点未准备好。
如果收到SUCCESS的从节点数量超过集群节点的半数,或者收到集群内所有节点的响应结果后调用CountDownLatch的countDown()方法从而唤醒了主线程,则继续执行后续流程,如代码清单9-23所示。
if (memberState.isQuorum(succNum.get())) {
lastSuccHeartBeatTime = System.currentTimeMillis();
} else {
if (memberState.isQuorum(succNum.get() + notReadyNum.get())) {
lastSendHeartBeatTime = -1;
} else if (maxTerm.get() > term) {
changeRoleToCandidate(maxTerm.get());
} else if (inconsistLeader.get()) {
changeRoleToCandidate(term);
} else if (DLedgerUtils.elapsed(lastSuccHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs) {
changeRoleToCandidate(term);
}
}
第三步:对响应结果进行仲裁,关键点如下。
1)如果当前Leader节点收到超过集群半数节点的认可(SUCCESS),表示集群状态正常,则正常按照心跳包间隔发送心跳包。 2)如果当前Leader节点收到SUCCESS的响应数加上未准备投票的节点数超过集群节点的半数,则立即发送心跳包。 3)如果从节点的投票轮次比主节点大,则使用从节点的投票轮次,或从节点已经有了另外的主节点,节点状态从Leader转换为Candidate。
handleHeartBeat()方法详解
该方法是从节点在收到主节点的心跳包后的响应逻辑,如代码清单9-24所示。
if (request.getTerm() < memberState.currTerm()) {
return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getC ode()));
} else if (request.getTerm() == memberState.currTerm()){
if (request.getLeaderId().equals(memberState.getLeaderId())) {
lastLeaderHeartBeatTime = System.currentTimeMillis();
return CompletableFuture.completedFuture(new HeartBeatResponse());
}
}
第一步:如果发送心跳包的节点(Leader节点)的投票轮次小于从节点的投票轮次,返回EXPIRED_TERM,告知对方它的投票轮次已经过期,需要重新进入选举。如果Leader节点的投票轮次与当前从节点的投票轮次相同,并且发送心跳包的节点(Leader节点)是当前从节点的主节点,则返回成功,如代码清单9-25所示。
synchronized (memberState) {
if (request.getTerm() < memberState.currTerm()) {
return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));
} else if (request.getTerm() == memberState.currTerm()) {
if (memberState.getLeaderId() == null) {
changeRoleToFollower(request.getTerm(), request.getLeaderId());
return CompletableFuture.completedFuture(new HeartBeatResponse());
} else if (request.getLeaderId().equals(memberState.getLeaderId())) {
lastLeaderHeartBeatTime = System.currentTimeMillis();
return CompletableFuture.completedFuture(new HeartBeatResponse());
} else { //省略错误日志输出代码
return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedge
rResponseCode.INCONSISTENT_LEADER.getCode()));
}
} else {
changeRoleToCandidate(request.getTerm());
needIncreaseTermImme diately = true;
return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.TERM_NOT_READY.getCode()));
}
}
第二步:通常情况下第一步将直接返回,本步骤主要用于处理异常情况,需要加锁以确保线程安全,核心处理逻辑如下。
1)如果发送心跳包的节点(Leader节点)的投票轮次小于当前从节点的投票轮次,返回EXPIRED_TERM,告知对方它的投票轮次已经过期,需要重新进入选举。 2)如果发送心跳包的节点的投票轮次等于当前从节点的投票轮次,需要根据当前从节点维护的leaderId来继续判断下列情况。
如果当前从节点维护的主节点ID(leaderId)为空,则使用主节点的ID,并返回成功。
如果当前从节点的维护的主节点ID(leaderId)与发送心跳包的节点ID相同,则更新上一次收到心跳包的时间戳,并返回成功。
如果当前从节点的维护的主节点ID与发送心跳包的节点ID不同,说明集群中存在另外一个Leader节点,则返回INCONSISTENT_LEADER,对端节点将进入Candidate状态。
3)如果发送心跳包的节点的投票轮次大于当前从节点的投票轮次,则认为从节点并未准备好,从节点将进入Candidate状态,并立即发起一次投票。
上面从源码的角度详细分析了RocketMQ DLedger基于Raft协议实现的Leader选举,下面通过图9-5所示的流程图更加直观地阐述其实现原理。
