一、ZooKeeper集群
我们先模拟搭建一个ZooKeeper集群环境:
第一步,把下载好的文件复制好三份
第二步,准备三个数据目录
第三步,在每个数据目录下创建myid文件,文件内容分别写入1、2、3
第四步,修改每个zoo.cfg配置文件,具体修改内容如下,不同的服务使用不同的clientPort端口
tickTime=2000
initLimit=10
syncLimit=5
dataDir=D:\\zookeeper\\data1
clientPort=2181
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
最后依次启动即可。
二、源码分析
接下来我们将从源码的角度分析ZooKeeper集群启动过程以及关于集群的一些知识。回到QuorumPeerMain启动类中,其main方法会调用initializeAndRun:
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
//解析配置文件,这在分析单机启动的时候,已经见到介绍过了
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
//判断当前是否是集群启动模式
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
ZooKeeperServerMain.main(args);
}
}
我们看看ZooKeeper是怎么判断当前启动是集群还是非集群config.isDistributed():
```java
public boolean isDistributed() {
return quorumVerifier != null && (!standaloneEnabled || quorumVerifier.getVotingMembers().size() > 1);
}
quorumVerifier 的实例是在解析配置文件的parse方法中实例化的如下所示:
if (dynamicConfigFileStr == null) {
setupQuorumPeerConfig(zkProp, true);
if (isDistributed() && isReconfigEnabled()) {
backupOldConfig();
}
}
void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode) throws IOException, ConfigException {
//解析配置文件中的server.*配置
quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
//解析data目录下的myid文件,设置为当前serverId属性
setupMyId();
//解析客户端端口
setupClientPort();
//设置当前节点是否参与选举
setupPeerType();
//验证当前配置的合法性
checkValidity();
}
继续执行runFromConfig方法:
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
//去除了部分代码
try {
//此时也会初始化两个网络IO处理工厂
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
//初始化工厂,这和单机启动一样
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
}
//实例化QuorumPeer 对象
quorumPeer = getQuorumPeer();
//设置日志文件处理器(已经分析过)
quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}
quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if (quorumPeer.isQuorumSaslAuthEnabled()) {
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
if (config.jvmPauseMonitorToRun) {
quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
}
//此时我们需要关注的就是这个start方法,这也是集群启动的入口方法
quorumPeer.start();
ZKAuditProvider.addZKStartStopAuditLog();
quorumPeer.join();
} catch (InterruptedException e) {
} finally {
try {
metricsProvider.stop();
} catch (Throwable error) {
LOG.warn("Error while stopping metrics", error);
}
}
}
start方法
public synchronized void start() {
//加载数据库
loadDataBase();
//启动客户端监听程序(接收客户端请求),和单机模式一样
startServerCnxnFactory();
try {
//启动管理后台程序
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
//开始leader选举
startLeaderElection();
startJvmPauseMonitor();
//启动当前线程,执行run方法
super.start();
}
从启动上逻辑看,集群启动,相比单机启动多了个leader选举过程,但是还是有个中差异。在加载数据库的时候,集群模式下,多了两个参数currentEpoch和acceptedEpoch,至于这两个参数具体作用是什么先不分析。
三、总结
有了单机模式下启动过程的分析,集群模式的启动过程分析起来,就相对简单了些,难点在于启动过程中leader的选举,以及怎么去维护当前集群,以及集群中处理请求的过程,所以接下来将从集群模式下去一一分析这些问题。
以上,有任何不对的地方,请留言指正,敬请谅解。