03、ZooKeeper 服务端启动

时序图

todo

源码

// QuorumPeerMain
public static void main(String[] args) {
     
   QuorumPeerMain main = new QuorumPeerMain();
   try {

 
       main.initializeAndRun(args);
   } catch (IllegalArgumentException e) {

 
       LOG.error("Invalid arguments, exiting abnormally", e);
       LOG.info(USAGE);
       System.err.println(USAGE);
       System.exit(2);
   } catch (ConfigException e) {

 
       LOG.error("Invalid config, exiting abnormally", e);
       System.err.println("Invalid config, exiting abnormally");
       System.exit(2);
   } catch (DatadirException e) {

 
       LOG.error("Unable to access datadir, exiting abnormally", e);
       System.err.println("Unable to access datadir, exiting abnormally");
       System.exit(3);
   } catch (AdminServerException e) {

 
       LOG.error("Unable to start AdminServer, exiting abnormally", e);
       System.err.println("Unable to start AdminServer, exiting abnormally");
       System.exit(4);
   } catch (Exception e) {

 
       LOG.error("Unexpected exception, exiting abnormally", e);
       System.exit(1);
   }
   LOG.info("Exiting normally");
   System.exit(0);
}

// QuorumPeerMain
protected void initializeAndRun(String[] args)
        throws ConfigException, IOException, AdminServerException
{
   
     
	// 1. args[0]代表配置文件的路径,读取配置文件,加载到QuorumPeerConfig中
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
   
     
        config.parse(args[0]);
    }

    // Start and schedule the the purge task
    // 2. 启动清理任务
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
            .getDataDir(), config.getDataLogDir(), config
            .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();

	// 3. 判断是否是集群模式
    if (args.length == 1 && config.isDistributed()) {
   
     
    	// 集群模式运行
        runFromConfig(config);
    } else {
   
     
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        // 当前只有一个节点,切换到standalone模式
        // 单机模式运行
        ZooKeeperServerMain.main(args);
    }
}

解析配置文件

// QuorumPeerConfig
public void parse(String path) throws ConfigException {
   
     
   LOG.info("Reading configuration from: " + path);
  
   try {
   
     
   	   // 根据路径,读取配置文件
       File configFile = (new VerifyingFileFactory.Builder(LOG)
           .warnForRelativePath()
           .failForNonExistingPath()
           .build()).create(path);
       
       // 将配置文件中的每一行解析成键值对,放到properties中
       Properties cfg = new Properties();
       FileInputStream in = new FileInputStream(configFile);
       try {
   
     
           cfg.load(in);
           configFileStr = path;
       } finally {
   
     
           in.close();
       }
       
       // 获取Properties中的键值对,设置到QuorumPeerConfig对应的属性上
       // 后续QuorumPeer的启动会使用到这些属性
       parseProperties(cfg);
   } catch (IOException e) {
   
     
       throw new ConfigException("Error processing " + path, e);
   } catch (IllegalArgumentException e) {
   
     
       throw new ConfigException("Error processing " + path, e);
   }   

   // 省略
   
}

// QuorumPeerConfig
public void parseProperties(Properties zkProp)
    throws IOException, ConfigException {
   
     
     int clientPort = 0;
     int secureClientPort = 0;
     String clientPortAddress = null;
     String secureClientPortAddress = null;
     VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
     // 遍历properties中的键值对,设置到QuorumPeerConfig的属性上
     for (Entry<Object, Object> entry : zkProp.entrySet()) {
   
     
         String key = entry.getKey().toString().trim();
         String value = entry.getValue().toString().trim();
         if (key.equals("dataDir")) {
   
     
             dataDir = vff.create(value);
         } else if (key.equals("dataLogDir")) {
   
     
             dataLogDir = vff.create(value);
         }
         // 省略
      }

	if (dynamicConfigFileStr == null) {
   
     
		// 设置QuorumPeerConfig的配置
        setupQuorumPeerConfig(zkProp, true);
        if (isDistributed() && isReconfigEnabled()) {
   
     
            // we don't backup static config for standalone mode.
            // we also don't backup if reconfig feature is disabled.
            backupOldConfig();
        }
    }
}

下面看下如何设置QuorumPeer

// QuorumPeerConfig
void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)
            throws IOException, ConfigException {
   
     
    // 1. 创建QuorumVerifier,对不同部署模式下参与者和观察者的数量进行校验
    quorumVerifier = parseDynamicConfig(pr op, electionAlg, true, configBackwardCompatibilityMode);
 	// 2. 设置myid
    setupMyId();
    // 3. 设置客户端连接端口
    setupClientPort();
    // 4. 设置当前节点类型
    setupPeerType();
    // 5. 校验配置合法性
    checkValidity();
}

parseDynamicConfig

public static QuorumVerifier parseDynamicConfig(Properties dynamicConfigProp, int eAlg, boolean warnings,
	   boolean configBackwardCompatibilityMode) throws IOException, ConfigException {
     
    boolean isHierarchical = false;
    for (Entry<Object, Object> entry : dynamicConfigProp.entrySet()) {
   
     
        String key = entry.getKey().toString().trim();                    
        if (key.startsWith("group") || key.startsWith("weight")) {
   
     
           isHierarchical = true;
        } else if (!configBackwardCompatibilityMode && !key.startsWith("server.") && !key.equals("version")){
   
      
           LOG.info(dynamicConfigProp.toString());
           throw new ConfigException("Unrecognised parameter: " + key);                
        }
    }
   
	// 根据isHierarchical来决定创建的类型,一般情况下isHierarchical都是false,此时会创建QuorumMaj
    QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp, isHierarchical);
    
    // 参与投票的节点个数
    int numParticipators = qv.getVotingMembers().size();
    // 不参与投票的节点Observer的个数
    int numObservers = qv.getObservingMembers().size();
    // 不同部署模式下进行校验
    if (numParticipators == 0) {
   
     
    	// 不允许使用单机模式,并且当前没有participator直接报错
        if (!standaloneEnabled) {
   
     
            throw new IllegalArgumentException("standaloneEnabled = false then " +
                    "number of participants should be >0");
        }
        // 没有participator的情况下,不允许有observer
        if (numObservers > 0) {
   
     
            throw new IllegalArgumentException("Observers w/o participants is an invalid configuration");
        }
    } else if (numParticipators == 1 && standaloneEnabled) {
   
     
    	// 允许单机部署并且participator的个数为1,此时obsserver的个数不能为0
        // HBase currently adds a single server line to the config, for
        // b/w compatibility reasons we need to keep this here. If standaloneEnabled
        // is true, the QuorumPeerMain script will create a standalone server instead
        // of a quorum configuration
        LOG.error("Invalid configuration, only one server specified (ignoring)");
        if (numObservers > 0) {
   
     
            throw new IllegalArgumentException("Observers w/o quorum is an invalid configuration");
        }
    } else {
   
     
    	// 对participator参与投票的节点的个数进行验证,如果《=2或者格式不是计数,会在日志中进行提醒
        if (warnings) {
   
     
            if (numParticipators <= 2) {
   
     
                LOG.warn("No server failure will be tolerated. " +
                    "You need at least 3 servers.");
            } else if (numParticipators % 2 == 0) {
   
     
                LOG.warn("Non-optimial configuration, consider an odd number of servers.");
            }
        }
        /*
         * If using FLE, then every server requires a separate election
         * port.
         */            
       if (eAlg != 0) {
   
     	
       	   // 校验参与选举的节点是否都设置了选举端口
           for (QuorumServer s : qv.getVotingMembers().values()) {
   
     
               if (s.electionAddr == null)
                   throw new IllegalArgumentException(
                           "Missing election port for server: " + s.id);
           }
       }   
    }
    return qv;
}

下面看下如何创建QuorumMaj
QuorumMaj中保存当前中有哪些节点,以及这些节点的id,并标识哪些节点是参与投票的,哪些节点是观察者不参与投票

// QuorumMaj
public QuorumMaj(Properties props) throws ConfigException {
   
     
	// 遍历配置项
    for (Entry<Object, Object> entry : props.entrySet()) {
   
     
        String key = entry.getKey().toString();
        String value = entry.getValue().toString();
		// 比如server.1 server.2 指定集群中各个服务端节点的地址,各种连接端口的配置
        if (key.startsWith("server.")) {
   
     
            int dot = key.indexOf('.');
            // 获取id
            long sid = Long.parseLong(key.substring(dot + 1));
            // QuorumServer封装了服务端节点的ip和各种端口
            QuorumServer qs = new QuorumServer(sid, value);
            // 创建id->节点之间的映射
            allMembers.put(Long.valueOf(sid), qs);
            // 默认情况下是PARTICIPANT
            if (qs.type == LearnerType.PARTICIPANT)
                votingMembers.put(Long.valueOf(sid), qs);
            else {
   
     
                observingMembers.put(Long.valueOf(sid), qs);
            }
        } else if (key.equals("version")) {
   
     
            version = Long.parseLong(value, 16);
        }
    }
    half = votingMembers.size() / 2;
}

setupMyId

从配置文件中读取当前节点的id
将dataDir下myid文件中的内容作为当前节点的id

// QuorumPeerConfig
private void setupMyId() throws IOException {
   
     
    File myIdFile = new File(dataDir, "myid");
    // standalone server doesn't need myid file.
    if (!myIdFile.isFile()) {
   
     
        return;
    }
    BufferedReader br = new BufferedReader(new FileReader(myIdFile));
    String myIdString;
    try {
   
     
        myIdString = br.readLine();
    } finally {
   
     
        br.close();
    }
    try {
   
     
        serverId = Long.parseLong(myIdString);
        MDC.put("myid", myIdString);
    } catch (NumberFormatException e) {
   
     
        throw new IllegalArgumentException("serverid " + myIdString
                + " is not a number");
    }
}

setupClientPort

设置客户端连接端口
从配置文件的解析结果中根据id获取当前节点的客户端连接端口

private void setupClientPort() throws ConfigException {
   
     
    if (serverId == UNSET_SERVERID) {
   
     
        return;
    }
    QuorumServer qs = quorumVerifier.getAllMembers().get(serverId);
    if (clientPortAddress != null && qs != null && qs.clientAddr != null) {
   
     
        if ((!clientPortAddress.getAddress().isAnyLocalAddress()
                && !clientPortAddress.equals(qs.clientAddr)) ||
                (clientPortAddress.getAddress().isAnyLocalAddress()
                        && clientPortAddress.getPort() != qs.clientAddr.getPort()))
            throw new ConfigException("client address for this server (id = " + serverId +
                    ") in static config file is " + clientPortAddress +
                    " is different from client address found in dynamic file: " + qs.clientAddr);
    }
    if (qs != null && qs.clientAddr != null) clientPortAddress = qs.clientAddr;
}

setupPeerType

设置当前节点的角色

// QuorumPeerConfig
private void setupPeerType() {
   
     
    // Warn about inconsistent peer type
    // 判断当前节点是否是observer
    LearnerType roleByServersList = quorumVerifier.getObservingMembers().containsKey(serverId) ? LearnerType.OBSERVER
            : LearnerType.PARTICIPANT;
    if (roleByServersList != peerType) {
   
     
        LOG.warn("Peer type from servers list (" + roleByServersList
                + ") doesn't match peerType (" + peerType
                + "). Defaulting to servers list.");

        peerType = roleByServersList;
    }
}

checkValidity

// QuorumPeerConfig
public void checkValidity() throws IOException, ConfigException{
   
     
	// 当前是分布式部署
	// 对一些配置进行校验
	if (isDistributed()) {
   
     
       if (initLimit == 0) {
   
     
           throw new IllegalArgumentException("initLimit is not set");
       }
       if (syncLimit == 0) {
   
     
           throw new IllegalArgumentException("syncLimit is not set");
       }
       if (serverId == UNSET_SERVERID) {
   
     
           throw new IllegalArgumentException("myid file is missing");
       }
  }
}

启动清理任务

todo

启动QuorumPeer

// QuorumPeerMain
public void runFromConfig(QuorumPeerConfig config)
            throws IOException, AdminServerException
    {
   
     
      try {
   
     
          ManagedUtil.registerLog4jMBeans();
      } catch (JMException e) {
   
     
          LOG.warn("Unable to register log4j JMX control", e);
      }

      LOG.info("Starting quorum peer");
      // 首先读取配置,将配置中的信息设置到QuorumPeer中
      // ServerCnxnFactory用来管理和客户端的连接
      try {
   
     
          ServerCnxnFactory cnxnFactory = null;
          ServerCnxnFactory secureCnxnFactory = null;
			
		  // 从配置中取出客户端的连接地址和最大客户端连接数来创建cnxnFactory
          if (config.getClientPortAddress() != null) {
   
     
              cnxnFactory = ServerCnxnFactory.createFactory();
              cnxnFactory.configure(config.getClientPortAddress(),
                      config.getMaxClientCnxns(),
                      false);
          }
			
		  // 如果配置使用安全连接,会从配置中取出客户端安全连接地址和最大连接数来创建secureCnxnFactory
          if (config.getSecureClientPortAddress() != null) {
   
     
              secureCnxnFactory = ServerCnxnFactory.createFactory();
              secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                      config.getMaxClientCnxns(),
                      true);
          }
		
          quorumPeer = getQuorumPeer();
          // 使用配置中的dataLogDir和dataDir来创建事务快照日志
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      config.getDataLogDir(),
                      config.getDataDir()));
          quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
          quorumPeer.enableLocalSessionsUpgrading(
              config.isLocalSessionsUpgradingEnabled());
          //quorumPeer.setQuorumPeers(config.getAllMembers());
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setConfigFileName(config.getConfigFilename());
          // 创建数据库
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
          if (config.getLastSeenQuorumVerifier()!=null) {
   
     
              quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
          }
          quorumPeer.initConfigInZKDatabase();
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
          quorumPeer.setSslQuorum(config.isSslQuorum());
          quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
          quorumPeer.setLearnerType(config.getPeerType());
          quorumPeer.setSyncEnabled(config.getSyncEnabled());
          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
          if (config.sslQuorumReloadCertFiles) {
   
     
              quorumPeer.getX509Util().enableCertFileReloading();
          }

          // sets quorum sasl authentication configurations
          quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
          if(quorumPeer.isQuorumSaslAuthEnabled()){
   
     
              quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
              quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
              quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
              quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
              quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
          }
          quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
          quorumPeer.initialize();
          // 启动QuorumPeer
          quorumPeer.start();
          quorumPeer.join();
      } catch (InterruptedException e) {
   
     
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
      }
    }

public synchronized void start() {
   
     
 if (!getView().containsKey(myid)) {
   
     
        throw new RuntimeException("My id " + myid + " not in the peer list");
     }
    // 1. 加载数据库
    loadDataBase();
    // 2. 启动客户端连接管理工厂
    startServerCnxnFactory();
    try {
   
     
    	// 3. 启动adminServer
    	// 可以查看当前服务端节点的一些信息
        adminServer.start();
    } catch (AdminServerException e) {
   
     
        LOG.warn("Problem starting AdminServer", e);
        System.out.println(e);
    }
    // 4. 开始leader选举
    startLeaderElection();
    // 5. 开始主循环,不同的角色执行不同的操作
    super.start();
}

版权声明:「DDKK.COM 弟弟快看,程序员编程资料站」本站文章,版权归原作者所有