08、Zookeeper Leader选举源码解析

Zookeeper启动的main方法是org.apache.zookeeper.server.quorum.QuorumPeerMain类的main方法:

public static void main(String[] args) {
    QuorumPeerMain main = new QuorumPeerMain();
    try {

 
        main.initializeAndRun(args);
    } ...
    ...
}

进入到initializeAndRun方法:

protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
{
 
     // args[0]是传入的配置文件
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {

 
        // 解析配置文件
        config.parse(args[0]);
    }

    // Start and schedule the the purge task
    // 启动一个定时清除日志的任务
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
            .getDataDir(), config.getDataLogDir(), config
            .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();
     // 我们是集群启动Zookeeper,必须使用到配置文件,因此接下来运行到runFromConfig
    if (args.length == 1 && config.servers.size() > 0) {

 
        runFromConfig(config);
    } else {

 
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    }
}

这里传入的配置文件就是Zookeeper根目录下的conf/zoo.cfg文件(可参考启动脚本zkServer.sh),该配置文件可以配置哪些配置项可以参考QuorumPeerConfig的解析配置文件过程。接下来进入runFromConfig方法:

public void runFromConfig(QuorumPeerConfig config) throws IOException {
     
  try {

 
      ManagedUtil.registerLog4jMBeans();
  } catch (JMException e) {

 
      LOG.warn("Unable to register log4j JMX control", e);
  }

  LOG.info("Starting quorum peer");
  try {

 
      // cnxnFactory用于处理本节点上的一些IO操作,默认情况下是NIOServerCnxnFactory的对象
      ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
      // 配置本节点监听的客户端端口和连接数
      cnxnFactory.configure(config.getClientPortAddress(),
                            config.getMaxClientCnxns());
        // QuorumPeer是zk的重要类之一
      quorumPeer = getQuorumPeer();
        
        // 设置从配置文件读取到的所有配置节点(zoo.cfg中的server.1=localhost:2888:3888配置项)
      quorumPeer.setQuorumPeers(config.getServers());
      quorumPeer.setTxnFactory(new FileTxnSnapLog(
              new File(config.getDataLogDir()),
              new File(config.getDataDir())));
      // 设置使用的Leader选举算法
      quorumPeer.setElectionType(config.getElectionAlg());
      quorumPeer.setMyid(config.getServerId());
      quorumPeer.setTickTime(config.getTickTime());
      quorumPeer.setInitLimit(config.getInitLimit());
      quorumPeer.setSyncLimit(config.getSyncLimit());
      quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
      quorumPeer.setCnxnFactory(cnxnFactory);
      quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
      quorumPeer.setClientPortAddress(config.getClientPortAddress());
      quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
      quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
      quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
      quorumPeer.setLearnerType(config.getPeerType());
      quorumPeer.setSyncEnabled(config.getSyncEnabled());

      // sets quorum sasl authentication configurations
      quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
      if(quorumPeer.isQuorumSaslAuthEnabled()){

 
          quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
          quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
          quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
          quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
          quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
      }

      quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
      quorumPeer.initialize();

      quorumPeer.start();
      quorumPeer.join();
  } catch (InterruptedException e) {

 
      // warn, but generally this is ok
      LOG.warn("Quorum Peer interrupted", e);
  }
}

runFromConfig方法主要将从zoo.cfg中读取到的配置设置到QuorumPeer的对象属性中区,QuorumPeer继承了ZooKeeperThread类,而ZooKeeperThread继承了Thread类,因此QuorumPeer是一个线程类,稍后我们将分析其run方法。进入到quorumPeer.start()方法:

@Override
public synchronized void start() {

 
     // 从快照文件(dataDir目录下snapshot文件)中恢复会话和zk节点信息
    loadDataBase();
    // 处理IO事件(暂不了解这个的作用,之后的文章会对这个进行分析,现在跳过不影响我们分析Leader的选举)
    cnxnFactory.start();
    // 开始进行Leader的选举        
    startLeaderElection();
    // 开始监听本节点的状态,根据不同的状态(LOOKING、OBSERVING、FOLLOWING、LEADING)执行相应操作
    super.start();
}

版权声明:「DDKK.COM 弟弟快看,程序员编程资料站」本站文章,版权归原作者所有