一、QuorumPeer的run方法
前文,只是为Leader选举做好前期准备,但是还没触发选举过程,在start方法中,调用完startLeaderElection()后,会启动QuorumPeer线程,接下来我们就从他的run方法入手。
public void run() {
try {
while (running) {
switch (getPeerState()) {
case LOOKING:
setCurrentVote(makeLEStrategy().lookForLeader());
break;
case OBSERVING:
setObserver(makeObserver(logFactory));
observer.observeLeader();
break;
case FOLLOWING:
try {
setFollower(makeFollower(logFactory));
follower.followLeader();
break;
case LEADING:
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
break;
}
}
}
}
省略了部分代码,在这里我们只需要关心初始状态下的调用,也就是LOOKING下的调用:
setCurrentVote(makeLEStrategy().lookForLeader());
从名称上看,这就是Leader选举触发的入口方法,实际调用的就是FastLeaderElection的lookForLeader方法。
public Vote lookForLeader() throws InterruptedException {
self.start_fle = Time.currentElapsedTime();
try {
//初始化两个集合
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = minNotificationInterval;
synchronized (this) {
logicalclock.incrementAndGet();
//首先更新当前选票为本节点
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
/*
* 开始发送通知信息,也就是把自己的选票发送给其它节点,封装成ToSend实体,然后添加到sendqueue
* 发送线程去消费这个队列,并转交给Listener中的发送线程去发送,我们知道Listener初始化的时候,只是开启了端口监听
* 此时收发线程还没有创建,所以QuorumCnxManager的toSend方法,就会调用connectOne(sid)方法去建立连接
* 如果集群中已经有节点启动,那么就会建立相应的连接关系
*/
sendNotifications();
SyncedLearnerTracker voteSet;
//循环执行
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
//此时会接收到其它节点的选票信息
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
if (n == null) {
if (manager.haveDelivered()) {
sendNotifications();
} else {
manager.connectAll();
}
int tmpTimeOut = notTimeout * 2;
notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
//首先判断是否是合法选票
} else if (validVoter(n.sid) && validVoter(n.leader)) {
switch (n.state) {
case LOOKING:
//如果当前选票的epoch大于已选节点
if (n.electionEpoch > logicalclock.get()) {
//设置logicalclock为当前选票
logicalclock.set(n.electionEpoch);
recvset.clear();
//首先根据Epoch计算,如果Epoch相等,再判断zxid,如果再相等再计算sid
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
//如果是当前选票要比已选节点新,更新已选节点为当前选票
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
//否则更新为已选节点
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
//把当前选票结果通知给其它节点
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
//如果当前选票比已选结果小,则日志记录即可
break;
//如果当前选票的electionEpoch与已选相等,那么判断是否需要更新当前已选结果
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
// 记录当前收到的选票
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//实例化选票追踪器
voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
//此时会判断是否有过半节点同意了已选选票
if (voteSet.hasAllQuorums()) {
//如果是
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
//如果已经选举完成
if (n == null) {
//设置当前状态为,如果选举出来的选票是本节点,设置本节点的状态是Leader,否则根据是否是参与者,设置为Follower或者Observer
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); //清空当前的recvqueue队列
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
break;
case FOLLOWING:
case LEADING:
//如果是崩溃回复重新选举,也是一样的思路
if (n.electionEpoch == logicalclock.get()) {
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
setPeerState(n.leader, voteSet);
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
//都是进行过半选举
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized (this) {
logicalclock.set(n.electionEpoch);
setPeerState(n.leader, voteSet);
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
break;
}
} else {
if (!validVoter(n.leader)) {
}
if (!validVoter(n.sid)) {
}
}
}
return null;
} finally {
}
}
二、流程梳理
Leader选举,集群中每个节点,先把选票投给自己,发送给集群中其它节点,此时会有三个值epoch、zxid、sid,选举leader也是通过这三个值,epoch已经分析过,代表系统中最新的全局唯一提案id,zxid表示最新的事务id、sid是节点自身的myid,首先根据epoch判断,把票投给最大的epoch,如果相等,按照zxid,最后才是myid,所以集群第一次启动的时候,leader节点往往会是myid最大的那台服务器,集群中达半数以上节点都投给一个节点,那么可以选举这个节点为Leader节点,其余为Follower或Observer节点(其中Observer节点不参与投票)。
选举算法理解起来并不难,但是整个通信过程相对比较复杂,前文知道,节点中相互通信是通过Listener来实现,在FastLeaderElection中又开启了一个收发工作线程来处理通信数据。简单理解就是FastLeaderElection中的收发工作线程,就是用来接收和发送选票,Listener中的收发线程是用来实际通信。具体通信流程如下:
1、 lookForLeader()方法开启Leader选举,此时会先调用sendNotifications()方法,这个方法会把自己选票,封装成n个ToSend对象(n表示集群中节点个数),然后添加到sendqueue发送队列中;
2、 此时FastLeaderElection的发送线程,就会不断从sendqueue队列中取出选票,然后封装成ByteBuffer对象,再调用QuorumCnxManager的toSend方法,这个方法会把ByteBuffer添加到queueSendMap集合中,这个集合键值是sid我们知道初始状态下Listener只是在进行端口监听,通信链路还没建立,所以此时会主动去调用connectOne(sid)方法去建立通信链路sendqueue队列消费完成,也意味着当前的通信链路也建立了起来;
3、 我们知道Listener中的SendWorker线程会不断的从queueSendMap队列中取值,每个sid都会对应一个SendWorker,也就是选票最终都是通过SendWorker发送出去,此时每个节点都会向其他节点发送选票;
4、 SendWorker对应的就是RecvWorker,RecvWorker可以不断的接收来自其它节点的选票,封装成Message然后添加到阻塞队列recvQueue中;
5、 FastLeaderElection中接收线程WorkerReceiver,会不断的从recvQueue队列中取出选票,然后封装成Notification对象,如果当前是在进行Leader选举且本节点状态是LOOLING,那么把当前的Notification添加到recvqueue队列中,并且对当前选票跟已选结果进行比较,如果已选结果胜出,那么把已选结果封装成ToSend对象,添加到sendqueue,发送给其它节点如果本节点不是LOOKING状态,而选票节点是LOOKING状态,那么表示本节点要比选票节点要新,也就是会把本节点的票投出去,添加到sendqueue队列中;
6、 sendNotifications()执行完继续执行,此时lookForLeader()方法中会从recvqueue队列中取出Notification对象,然后根据当前选票判断是否需要更新当前选票,然后把投票结果发送出去,如果当前选票得票数已经过半,那么会通过while循环不断的从recvqueue队列中取出Notification对象,然后跟当前选票进行比较,如果还有更新的票,那么继续添加到recvqueue队列中,退出while循环,如果此时没有更新的票,那么表示当前选出来的节点可以作为Leader节点,然后根据当选票据,设置对应的节点状态,然后清空recvqueue队列,方法返回;
三、总结
本文大致分析和总结了集群启动时,Leader的选举过程,以及相应的通信链路的建立,接下来我们将分析,集群模式下数据的处理执行过程,当Leader崩溃或者集群中Follower节点已经过半宕机,那么怎么进行数据恢复。
以上,有任何不对的地方,请留言指正,敬请谅解。