一、请求处理
前文分析了Leader选举过程,完成选举之后,就会把对应节点设置成对应的状态,我们知道集群中有三种角色Leader、Follower、Observer,分别对应着源码中的LEADING、FOLLOWING、OBSERVING。还是回到QuorumPeer的run方法:
public void run() {
try {
while (running) {
switch (getPeerState()) {
//开始leader选举过程
case LOOKING:
setCurrentVote(makeLEStrategy().lookForLeader());
break;
case OBSERVING:
setObserver(makeObserver(logFactory));
observer.observeLeader();
break;
case FOLLOWING:
try {
setFollower(makeFollower(logFactory));
follower.followLeader();
break;
case LEADING:
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
break;
}
}
}
}
接下来就围绕着OBSERVING、FOLLOWING、LEADING这三种模式进行请求的分析过程。
二、OBSERVING
OBSERVING对应着集群中Observer节点,这个节点状态不参与Leader选举,也不参与投票,写请求会转发给Leader节点,读请求会直接读取相应节点信息,同步集群中最新的数据到Observer节点。
//设置当前节点为Observer节点
setObserver(makeObserver(logFactory));
observer.observeLeader();
makeObserver方法会返回一个Observer对象,构造参数中会传入QuorumPeer和ObserverZooKeeperServer实例。所以我们跟着observeLeader()方法进行一步步分析:
void observeLeader() throws Exception {
long connectTime = 0;
boolean completedSync = false;
try {
//设置当前ZAB状态为DISCOVERY
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
//找到Leader节点
QuorumServer master = findLearnerMaster();
try {
//连接Leader节点
connectToLeader(master.addr, master.hostname);
connectTime = System.currentTimeMillis();
//获取Leader节点的epoch值,也就是最新的事务id
long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
//设置Leader节点的地址和主机id值
self.setLeaderAddressAndId(master.addr, master.getId());
//设置ZAB状态为同步状态
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
//同步最新数据
syncWithLeader(newLeaderZxid);
//设置当前ZAB状态为广播
self.setZabState(QuorumPeer.ZabState.BROADCAST);
completedSync = true;
QuorumPacket qp = new QuorumPacket();
//此时会不断的读取leader节点上传来的数据包,并处理
while (this.isRunning() && nextLearnerMaster.get() == null) {
readPacket(qp);
processPacket(qp);
}
} catch (Exception e) {
}
} finally {
}
}
此时我们需要集中分析registerWithLeader、syncWithLeader、processPacket这三个方法。
registerWithLeader:
protected long registerWithLeader(int pktType) throws IOException {
//得到本节点的最新事务id
long lastLoggedZxid = self.getLastLoggedZxid();
//封装一个请求数据包,请求类型为OBSERVERINFO,请求内容是当前节点的Epoch值
QuorumPacket qp = new QuorumPacket();
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
//封装节点的id值、协议号和版本号
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());
//发送给Leader节点
writePacket(qp, true);
//读取leader节点的值
readPacket(qp);
//得到Leader节点上的epoch值
final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
if (qp.getType() == Leader.LEADERINFO) {
//获取leader 的协议版本号
leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
byte[] epochBytes = new byte[4];
final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
//如果leader节点上的epoch大于本节点,把本节点的epoch设置为leader节点上的epoch值
if (newEpoch > self.getAcceptedEpoch()) {
wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
self.setAcceptedEpoch(newEpoch);
} else if (newEpoch == self.getAcceptedEpoch()) {
//如果相等不做操作
wrappedEpochBytes.putInt(-1);
} else {
//否则抛出异常
throw new IOException("Leaders epoch, "+ newEpoch + " is less than accepted epoch, "+ self.getAcceptedEpoch());
}
//给Leader节点发送一个响应
QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
writePacket(ackNewEpoch, true);
return ZxidUtils.makeZxid(newEpoch, 0);
} else {
if (newEpoch > self.getAcceptedEpoch()) {
self.setAcceptedEpoch(newEpoch);
}
if (qp.getType() != Leader.NEWLEADER) {
LOG.error("First packet should have been NEWLEADER");
throw new IOException("First packet should have been NEWLEADER");
}
return qp.getZxid();
}
}
registerWithLeader方法就是比较当前Observer节点和Leader节点上的最新事务id是否一致,并更新当前Observer节点的事务id值,如果当前的事务id小于或等于Leader端的事务id写入响应给Leader节点。
syncWithLeader:
这个方法就是同步Leader节点的数据,使当前节点保持与Leader节点一致(省略了部分代码)。
protected void syncWithLeader(long newLeaderZxid) throws Exception {
//响应包
QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
QuorumPacket qp = new QuorumPacket();
long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
QuorumVerifier newLeaderQV = null;
boolean snapshotNeeded = true;
boolean syncSnapshot = false;
//在registerWithLeader方法中,给Leader节点写入了响应包这里继续读取Leader节点的响应包
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
synchronized (zk) {
//Leader端响应的是DIFF
if (qp.getType() == Leader.DIFF) {
//设置为DIFF,并判断是否需要强制写入初始化快照文件
self.setSyncMode(QuorumPeer.SyncMode.DIFF);
if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) {
snapshotNeeded = true;
syncSnapshot = true;
} else {
snapshotNeeded = false;
}
//如果响应类型是SNAP
} else if (qp.getType() == Leader.SNAP) {
self.setSyncMode(QuorumPeer.SyncMode.SNAP);
//此时会清空当前内存数据,然后同步Leader上的数据到本地内存
zk.getZKDatabase().deserializeSnapshot(leaderIs);
//设置最新的zxid
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
syncSnapshot = true;
} else if (qp.getType() == Leader.TRUNC) {
//如果是TRUNC
self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
//通过日志文件,同步与Leader一直的数据
boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
//设置最新zxid
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
}
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
//创建session追踪器
zk.createSessionTracker();
long lastQueued = 0;
boolean isPreZAB1_0 = true;
boolean writeToTxnLog = !snapshotNeeded;
TxnLogEntry logEntry;
outerLoop:
while (self.isRunning()) {
//如果Leader端响应的是DIFF数据同步,那么会继续读取Leader端的数据
readPacket(qp);
switch (qp.getType()) {
//Leader会通过提案的方式发送给Learner节点
case Leader.PROPOSAL:
PacketInFlight pif = new PacketInFlight();
logEntry = SerializeUtils.deserializeTxn(qp.getData());
packetsNotCommitted.add(pif);
break;
//Leader在不断的发送提交操作
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
//取出最新的提案,然后执行提交操作,也就是调用processTxn方法
pif = packetsNotCommitted.peekFirst();
if (!writeToTxnLog) {
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn(
"Committing 0x{}, but next proposal is 0x{}",
Long.toHexString(qp.getZxid()),
Long.toHexString(pif.hdr.getZxid()));
} else {
zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
}
} else {
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.INFORM:
case Leader.INFORMANDACTIVATE:
PacketInFlight packet = new PacketInFlight();
}
if (!writeToTxnLog) {
zk.processTxn(packet.hdr, packet.rec);
} else {
packetsNotCommitted.add(packet);
packetsCommitted.add(qp.getZxid());
}
break;
//已经完成与服务端的数据更新操作,退出循环
case Leader.UPTODATE:
if (isPreZAB1_0) {
//建立快照信息
zk.takeSnapshot(syncSnapshot);
self.setCurrentEpoch(newEpoch);
}
self.setZooKeeperServer(zk);
self.adminServer.setZooKeeperServer(zk);
break outerLoop;
case Leader.NEWLEADER:
if (snapshotNeeded) {
zk.takeSnapshot(syncSnapshot);
}
self.setCurrentEpoch(newEpoch);
writeToTxnLog = true;
isPreZAB1_0 = false;
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startupWithoutServing();
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
packetsNotCommitted.clear();
}
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
}
}
//响应Leader
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
//设置zk为启动状态
zk.startServing();
}
syncWithLeader方法就是同步Leader服务的最新数据,数据同步的方式有DIFF、SNAP、TRUNC
1、 DIFF:Leader响应的DIFF模式,Leader就会以Proposal的方式发送给Observer节点,然后再发送commit指令,Observer节点接收到commit指令,进行数据持久化操作;
2、 SNAP:这种方式就是Observer会清空本地数据,然后直接接收Leader的的所有数据;
3、 TRUNC:也会清空本地数据,然后通过最新的zxid找到本地日志,再从日志中恢复数据;
这就是服务启动后Observer节点和Leader节点进行数据同步的算法。
完成数据同步之后,就会通过while循环不断的接收和处理Leader端的数据:
while (this.isRunning() && nextLearnerMaster.get() == null) {
readPacket(qp);
processPacket(qp);
}
主要看看processPacket方法:
protected void processPacket(QuorumPacket qp) throws Exception {
TxnLogEntry logEntry;
TxnHeader hdr;
TxnDigest digest;
Record txn;
switch (qp.getType()) {
//Leader会与Observer节点保持心跳检测
case Leader.PING:
ping(qp);
break;
//如果是提案,commit以及uptodate则忽略操作
case Leader.PROPOSAL:
LOG.warn("Ignoring proposal");
break;
case Leader.COMMIT:
LOG.warn("Ignoring commit");
break;
case Leader.UPTODATE:
LOG.error("Received an UPTODATE message after Observer started");
break;
case Leader.REVALIDATE:
//验证操作
revalidate(qp);
break;
case Leader.SYNC:
//处理同步请求
((ObserverZooKeeperServer) zk).sync();
break;
case Leader.INFORM: //如果当前是INFORM则封装成Request请求提交给RequestProcessor,这也是Observer接收Leader数据同步的处理位置
ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
logEntry = SerializeUtils.deserializeTxn(qp.getData());
hdr = logEntry.getHeader();
txn = logEntry.getTxn();
digest = logEntry.getDigest();
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
request.setTxnDigest(digest);
ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk;
obs.commitRequest(request);
break;
case Leader.INFORMANDACTIVATE: //处理配置信息更新操作
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
byte[] remainingdata = new byte[buffer.remaining()];
buffer.get(remainingdata);
logEntry = SerializeUtils.deserializeTxn(remainingdata);
hdr = logEntry.getHeader();
txn = logEntry.getTxn();
digest = logEntry.getDigest();
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) txn).getData(), UTF_8));
request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
request.setTxnDigest(digest);
obs = (ObserverZooKeeperServer) zk;
boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
obs.commitRequest(request);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
break;
default:
LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
break;
}
}
集群启动后,作为Observer节点,先获取到Leader对象,然后把自己注册到Leader节点,并更新当前的最新事务id,然后同步数据保持与Leader节点数据一致性,接下来就是循环接收Leader传递过来的写请求,并持久化到本地。此时的服务容器是ObserverZooKeeperServer对象,我们知道单机下是ZooKeeperServer。
三、接收客户端请求
集群启动完成,并完成了数据同步后,Observer就开始正式工作,处理接收服务端的数据同步外,还可以接收客户端的请求,那么整个处理过程又是怎样呢?
客户端的请求会封装成一个Request对象,然后提交给RequestProcessor实例处理
firstProcessor.processRequest(si);
此时firstProcessor是ObserverRequestProcessor他的下一个是CommitProcessor然后再是FinalRequestProcessor,所以处理顺序也是这样。主要看看ObserverRequestProcessor和CommitProcessor的processorRequest方法。
ObserverRequestProcessor中会添加到queuedRequests阻塞队列中,其run方法会先调用 nextProcessor的processRequest方法,如果是事务性请求,再转发给Leader节点,接下来需要分析CommitProcessor中的处理逻辑。
processRequest:
public void processRequest(Request request) {
if (stopped) {
return;
}
request.commitProcQueueStartTime = Time.currentElapsedTime();
//添加到队列中
queuedRequests.add(request);
// 如果是事务性操作返回true,否则返回false
if (needCommit(request)) {
//往queuedWriteRequests队列中添加当前请求
queuedWriteRequests.add(request);
numWriteQueuedRequests.incrementAndGet();
} else {
numReadQueuedRequests.incrementAndGet();
}
wakeup();
}
在run方法中,针对读请求,则从queuedRequests队列中取出request请求,然后提交给FinalRequestProcessor进行处理,然后返回给客户端,这个在单机启动下已经介绍过。如果当前是写请求,会把当前的写请求添加到pendingRequests集合中,此时Leader端处理好Observer转发的写请求,通过投票并通过了当前写请求,那么就会像Observer节点相应一个INFORM,收到这个消息之后就会执行提交操作,也就是往committedRequests中写入当前的request。此时run方法中就会针对pendingRequests和committedRequests中的数据进行对比,提交当前的写请求给FinalRequestProcessor然后响应客户端。
四、总结
Observer节点在集群启动完成后,把自己注册到Leader节点上,并同步Leader节点的数据保持数据一致性,当接收到客户端请求为事务性请求,则转发给Leader节点,Leader节点通过选票通过之后,在发送一个INFORM消息给Observer节点,收到该消息后,Observer执行提交操作。如果是读请求,直接读取数据响应客户端。
以上,有任何不对的地方,请留言指正,敬请谅解。