一、节点角色
ZooKeeper中节点角色有Leader、Follower、Observer。
1、 Leader:负责处理处理写请求、协调集群中的其它节点、发起投票,以及同步最新数据给其它节点;
2、 Follower:负责读请求,当接收到写请求会转发当前的写请求给Leader节点,参与Leader选举;
3、 Observer:负责读请求,不参与Leader选举;
二、源码分析
接下我们从源码的角度去分析ZooKeeper中的ZAB实现过程,回到集群启动的QuorumPeer的start方法:
public synchronized void start() {
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
startLeaderElection();
startJvmPauseMonitor();
super.start();
}
这个方法前文已经分析过,我们主要关注两个调用startLeaderElection()和super.start()前者是开始Leader选举,后者是启动当前QuorumPeer线程。我们先来看看startLeaderElection方法具体做了什么?
//服务的默认状态是LOOKING
private ServerState state = ServerState.LOOKING;
public synchronized void startLeaderElection() {
try {
//表示当前服务刚启动
if (getPeerState() == ServerState.LOOKING) {
//当前的选票,投给自己
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
//创建选举算法
this.electionAlg = createElectionAlgorithm(electionType);
}
其中myid就是在data目录下配置myid文件中的值,zxid就是当前日志中最新的事务id,epoch是64位的long类型,其中低32位表示当前发起的提案数,高32位表示当前leader选举的周期数,没发起一次提案低32位进行递增,新一轮的leader选举,高32自动递增。也就是保证了全局最新的提案id值。
接下来我们看看选举算法的创建过程createElectionAlgorithm(electionType):
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
//也就是electionAlgorithm=3才是合法有效的
switch (electionAlgorithm) {
case 1:
throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
case 2:
throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
case 3:
//创建一个QuorumCnxManager管理对象
QuorumCnxManager qcm = createCnxnManager();
//初始启动的时候oldQcm为空
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
//Listener 是在QuorumCnxManager构造方法中初始化的
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
//启动一个监听器
listener.start();
//快速选举算法
FastLeaderElection fle = new FastLeaderElection(this, qcm);
//启动leader选举
fle.start();
le = fle;
} else {
}
break;
default:
assert false;
}
return le;
}
createElectionAlgorithm创建了两个对象Listener和FastLeaderElection,并调用了其start方法。此时QuorumPeer中的electionAlg实例就是FastLeaderElection 对象。
三、Listener
Listener是QuorumCnxManager中定义的内部类,他是一个线程对象,业务逻辑的具体实现在run方法中:
public void run() {
if (!shutdown) {
//先获取用来做leader选举的所有通信地址
Set<InetSocketAddress> addresses;
if (self.getQuorumListenOnAllIPs()) {
addresses = self.getElectionAddress().getWildcardAddresses();
} else {
addresses = self.getElectionAddress().getAllAddresses();
}
//实例化一个线程计数器
CountDownLatch latch = new CountDownLatch(addresses.size());
//得到listener处理器
listenerHandlers = addresses.stream().map(address ->
new ListenerHandler(address, self.shouldUsePortUnification(), self.isSslQuorum(), latch))
.collect(Collectors.toList());
//通过线程池启动ListenerHandler线程
ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
listenerHandlers.forEach(executor::submit);
//等待所有线程执行完成
try {
latch.await();
} catch (InterruptedException ie) {
} finally {
// 关闭
for (ListenerHandler handler : listenerHandlers) {
try {
handler.close();
} catch (IOException ie) {
// Don't log an error for shutdown.
LOG.debug("Error closing server socket", ie);
}
}
}
}
LOG.info("Leaving listener");
if (!shutdown) {
if (socketException.get()) {
// 存在异常,调用异常处理器
socketBindErrorHandler.run();
}
}
}
Listener中又交给了ListenerHandler 处理器来进行处理,这也是一个线程,也是QuorumCnxManager中定义的内部类
public void run() {
try {
//接收连接
acceptConnections();
} catch (Exception e) {
} finally {
//计数器减一
latch.countDown();
}
}
acceptConnections()(部分代码被省略)这个方法就是创建ServerSocket对象,绑定端口并等待其它节点的连接:
private void acceptConnections() {
int numRetries = 0;
Socket client = null;
while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
try {
//创建ServerSocket连接,等待其它节点连接
serverSocket = createNewServerSocket();
while (!shutdown) {
try {
//等待连接操作
client = serverSocket.accept();
setSockOpts(client);
//接收其它节点的连接
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
receiveConnection(client);
}
numRetries = 0;
} catch (SocketTimeoutException e) {
}
}
} catch (IOException e) {
}
}
}
receiveConnection是QuorumCnxManager中定义的方法,
public void receiveConnection(final Socket sock) {
//这就是一个基本的Socket变成过程
DataInputStream din = null;
try {
//得到输入流
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
//处理连接
handleConnection(sock, din);
} catch (IOException e) {
closeSocket(sock);
}
}
查看其具体处理方法:
private void handleConnection(Socket sock, DataInputStream din) throws IOException {
Long sid = null, protocolVersion = null;
MultipleAddresses electionAddr = null;
try {
//协议版本,如果传递过来的协议版本是大于等于0的表示当前传递过来的是当前服务器的myid
protocolVersion = din.readLong();
if (protocolVersion >= 0) {
sid = protocolVersion;
} else {
try {
//否则解析出当前的sid值
InitialMessage init = InitialMessage.parse(protocolVersion, din);
sid = init.sid;
if (!init.electionAddr.isEmpty()) {
electionAddr = new MultipleAddresses(init.electionAddr,
Duration.ofMillis(self.getMultiAddressReachabilityCheckTimeoutMs()));
}
} catch (InitialMessage.InitialMessageException ex) {
return;
}
}
//如果当前的sid是Observer节点传递过来的,sid减一操作(负数)
if (sid == QuorumPeer.OBSERVER_ID) {
sid = observerCounter.getAndDecrement();
}
} catch (IOException e) {
return;
}
//进行权限认证,如果存在
authServer.authenticate(sock, din);
//如果当前的sid比本节点的小
if (sid < self.getId()) {
//取出当前的SendWorker
SendWorker sw = senderWorkerMap.get(sid);
//如果不为空,执行finish操作
if (sw != null) {
sw.finish();
}
//连接到当前的sid节点
if (electionAddr != null) {
connectOne(sid, electionAddr);
} else {
connectOne(sid);
}
} else if (sid == self.getId()) {
} else {
//初始化发送和接收工作线程
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
//启动线程
sw.start();
rw.start();
}
}
我们先简单介绍一下收发线程:
SendWorker :发送数据线程,其中定义了send方法,发送数据,都是基本的socket编程,run方法中会从queueSendMap集合中(QuorumCnxManager的属性ConcurrentHashMap<Long, BlockingQueue< ByteBuffer>>对象,键是sid也就是节点的myid,值是当前节点对应需要发送的内容集合 )不断的取出数据进行发送。此时在run方法中还会记录当前对应sid发送的最后一条记录(ConcurrentHashMap<Long, ByteBuffer> lastMessageSent)。这是SendWorker所做的工作。
RecvWorker :接收数据线程,会把收到的数据保存到recvQueue阻塞队列中。
也就是说在Listener中档收到的sid值大于本节点是,开启收发线程,如果小于节点,则调用connectOne方法,这个方法又会调用initiateConnectionAsync方法。
public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid) {
try {
connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid));
connectionThreadCnt.incrementAndGet();
} catch (Throwable e) {
return false;
}
return true;
}
此时通过QuorumConnectionReqThread对象来启动连接,这也是QuorumCnxManager的内部类,里边逻辑实现就是通过客户端的Socket去连接当前sid的对应的节点服务。如下所示:
private boolean startConnection(Socket sock, Long sid) throws IOException {
//初始化输入输出流
DataOutputStream dout = null;
DataInputStream din = null;
try {
BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
dout = new DataOutputStream(buf);
//协议版本(都是小于0)
long protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;
dout.writeLong(protocolVersion);
dout.writeLong(self.getId());
//把当前self的选举通信地址发送给对应的sid节点
Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2
? self.getElectionAddress().getAllAddresses()
: Arrays.asList(self.getElectionAddress().getOne());
String addr = addressesToSend.stream()
.map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);
dout.flush();
//等待响应
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
closeSocket(sock);
return false;
}
// 权限认证
QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
if (qps != null) {
authLearner.authenticate(sock, qps.hostname);
}
// 如果当前的sid比self的到关闭socket
if (sid > self.getId()) {
closeSocket(sock);
} else {
//否则开启收发线程,已经介绍过
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
sw.start();
rw.start();
return true;
}
return false;
}
Listener开始对应端口,监听其它节点的连接,也就是我们配置文件server.1=127.0.0.1:2888:3888中第二个端口号(3888),也就是说所有节点都会开启这个端口号,来监听其它节点的连接。Listener中处理其它节点的连接逻辑是根据sid进行判断,sid也就是配置中的myid也就是server.1中的1。如果其它节点的sid比本地节点的大,那么初始化接收发送线程,并启动,同时往senderWorkerMap保存sid和SendWorker线程对应关系,以及queueSendMap保存sid和阻塞队列。如果相等,那么表示当前配置文件出现错误,或者是存在bug,如果是小于当前节点的sid,那么通过当前节点去连接sid节点,并开启对应的收发线程。
假设有三台ZooKeeper服务器,对应的myid分别是1、2、3,他们都会开启对应端口去监听其它两台服务器的连接,假设2和3去连接1,此时1收到的sid要比自己大,那么此时1就会开启收发线程,并保存2和3的连接到senderWorkerMap集合中,此时1和2也会去连接3,3发现都比自己小,此时3就会关闭来自1、2的连接,转而主动去连接1和2,并开启对应的收发线程。
也就是Listener中会通过myid去比较各节点大小,小的保存大的连接,大的主动保持与小的连接,也就是2、3主动去连接1,此时1可以和2、3相互通信,2主动连接1,然后监听3 的连接,所以2也可以和1、3相互通信,至于3会主动去连接1和2,并可以相互通信。此时有新的节点加入,按照同样的方式去建立连接,Listener也就相当于在集群中的各节点建立通信。
我们知道,Listener会先开启端口监听,但是没有主动发起连接请求,所以连接请求在哪发起的需要进一步分析。
四、FastLeaderElection
在createElectionAlgorithm方法中除了开启Listener线程外还调用了FastLeaderElection的start方法。
public void start() {
//调用Messenger的start方法
this.messenger.start();
}
Messenger是FastLeaderElection定义的内部类,在构造方法中进行初始化,并且还初始化了sendqueue和recvqueue两个阻塞队列。
Messenger中的start方法有启动了两个线程:
void start() {
this.wsThread.start();
this.wrThread.start();
}
其中wsThread是WorkerSender实例,wrThread是WorkerReceiver实例,这也是一对收发线程。分别分析其run方法,
首先是WorkerSender的run方法:
public void run() {
while (!stop) {
try {
//从sendqueue队列中取出待发送的信息
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if (m == null) {
continue;
}
//处理待发送信息
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
其中process方法会调用QuorumCnxManager的toSend方法进行处理
public void toSend(Long sid, ByteBuffer b) {
//如果当前消息是发送给自己的,只需往recvQueue队列中添加当前消息
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
} else {
//如果是集群中其它节点那么连接发送,这就是第三部分分析的connectOne方法。也就是此时会调用Listener中对应的发送线程进行数据下发。
BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
addToSendQueue(bq, b);
connectOne(sid);
}
}
接下来是WorkerReceiver的run方法:
public void run() {
Message response;
while (!stop) {
try {
//此时会从recvQueue队列中取出对应的消息,Listener中的接收线程会不断地把收到的消息存放到这个集合中
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if (response == null) {
continue;
}
final int capacity = response.buffer.capacity();
//如果数据长度小于28个字节,因为Protocol版本号等头部信息已经有28个字节
if (capacity < 28) {
LOG.error("Got a short response from server {}: {}", response.sid, capacity);
continue;
}
//是否等于28个字节
boolean backCompatibility28 = (capacity == 28);
// 如果没有版本信息时
boolean backCompatibility40 = (capacity == 40);
response.buffer.clear();
// 实例化一个Notification对象
Notification n = new Notification();
//获取到sid的节点状态,选举出来的leader对象,事务id,epoch信息等
int rstate = response.buffer.getInt();
long rleader = response.buffer.getLong();
long rzxid = response.buffer.getLong();
long relectionEpoch = response.buffer.getLong();
long rpeerepoch;
//初始化版本号
int version = 0x0;
QuorumVerifier rqv = null;
try {
if (!backCompatibility28) {
rpeerepoch = response.buffer.getLong();
if (!backCompatibility40) {
version = response.buffer.getInt();
} else {
}
} else {
rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
}
//如果当前得到的版本好大于1
if (version > 0x1) {
int configLength = response.buffer.getInt();
if (configLength < 0 || configLength > capacity) {
byte[] b = new byte[configLength];
response.buffer.get(b);
synchronized (self) {
try {
rqv = self.configFromString(new String(b, UTF_8));
QuorumVerifier curQV = self.getQuorumVerifier();
if (rqv.getVersion() > curQV.getVersion()) {
if (self.getPeerState() == ServerState.LOOKING) {
self.processReconfig(rqv, null, null, false);
if (!rqv.equals(curQV)) {
self.shuttingDownLE = true;
self.getElectionAlg().shutdown();
break;
}
} else {
}
}
} catch (IOException | ConfigException e) {
}
}
} else {
}
} catch (BufferUnderflowException | IOException e) {
continue;
}
//如果当前不是是合法选票(OBserver)
if (!validVoter(response.sid)) {
Vote current = self.getCurrentVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes(UTF_8));
//把本节点中的信息发送出去
sendqueue.offer(notmsg);
} else {
//根据其他节点传递过来的节点状态相应对应的状态
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (rstate) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}
//此时封装Notification对象
n.leader = rleader;
n.zxid = rzxid;
n.electionEpoch = relectionEpoch;
n.state = ackstate;
n.sid = response.sid;
n.peerEpoch = rpeerepoch;
n.version = version;
n.qv = rqv;
//如果当前节点是LOOKING状态
if (self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
//把当前的Notificatio添加到recvqueue队列中
recvqueue.offer(n);
//如果发送过来的节点也是LOOKING状态,且它的epoch值小于最新选举的节点
if ((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock.get())) {
//那么就把本地节点的选票发送出去
Vote v = getVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
} else {
//获取当前节点的选票
Vote current = self.getCurrentVote();
//如果响应的节点还是LOOKING状态,也就是表示当前响应节点可能是Leader节点
if (ackstate == QuorumPeer.ServerState.LOOKING) {
if (self.leader != null) {
if (leadingVoteSet != null) {
self.leader.setLeadingVoteSet(leadingVoteSet);
leadingVoteSet = null;
}
self.leader.reportLookingSid(response.sid);
}
//投票给本节点
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted Exception while waiting for new message", e);
}
}
LOG.info("WorkerReceiver is down");
}
}
WorkerReceiver线程就是不断地处理Listener中的接收线程接收到的消息,似乎到了这里还是没有开启Leader选举。
五、总结
以上都是在Leader选举的前期准备,Listener会开启端口监听,然后在每个节点之间建立通信,节点之间的通信会创建一对收发线程,同时会把发送线程添加到senderWorkerMap中通过sid关联,并添加一个空的阻塞队列到queueSendMap集合,也是通过sid关联,发送线程SendWorker会循环的取queueSendMap中的数据,发送到对应的节点,RecvWorker从对应节点上接收数据,并把收到的数据封装成一个Message实体然后添加到recvQueue阻塞队列中。
FastLeaderElection中也会开启一对收发线程,FastLeaderElection也维护了一对收发阻塞队列,WorkerSender从sendqueue队列中取出对应的ToSend消息体,调用QuorumCnxManager的toSend方法,这个方法就是把当前的消息添加到queueSendMap,然后交给Listener中对应的SendWorker线程进行发送。WorkerReceiver会从QuorumCnxManager中recvQueue队列取出消息进行处理,处理结果添加到sendqueue队列中。
由于篇幅过长,将在下一节继续分析Leader选举过程。
以上,有任何不对的地方,请留言指正,敬请谅解。