05、ZooKeeper 角色初始化

zookeeper有如下三个角色:

1、 Leader;
2、 Follower;
3、 Observer;

在QuorumPeer的run方法中会根据当前节点的状态执行不同的操作

// QuorumPeer
public void run() {
   
     
  // 省略

  try {
   
     
    /*
             * Main loop
             */
    while (running) {
   
     
      switch (getPeerState()) {
   
     
        case LOOKING:
          LOG.info("LOOKING");

          if (Boolean.getBoolean("readonlymode.enabled")) {
   
     
            LOG.info("Attempting to start ReadOnlyZooKeeperServer");

            // Create read-only server but don't start it immediately
            final ReadOnlyZooKeeperServer roZk =
              new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

            // Instead of starting roZk immediately, wait some grace
            // period before we decide we're partitioned.
            //
            // Thread is used here because otherwise it would require
            // changes in each of election strategy classes which is
            // unnecessary code coupling.
            Thread roZkMgr = new Thread() {
   
     
              public void run() {
   
     
                try {
   
     
                  // lower-bound grace period to 2 secs
                  sleep(Math.max(2000, tickTime));
                  if (ServerState.LOOKING.equals(getPeerState())) {
   
     
                    roZk.startup();
                  }
                } catch (InterruptedException e) {
   
     
                  LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                } catch (Exception e) {
   
     
                  LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                }
              }
            };
            try {
   
     
              roZkMgr.start();
              reconfigFlagClear();
              if (shuttingDownLE) {
   
     
                shuttingDownLE = false;
                startLeaderElection();
              }
              setCurrentVote(makeLEStrategy().lookForLeader());
            } catch (Exception e) {
   
     
              LOG.warn("Unexpected exception", e);
              setPeerState(ServerState.LOOKING);
            } finally {
   
     
              // If the thread is in the the grace period, interrupt
              // to come out of waiting.
              roZkMgr.interrupt();
              roZk.shutdown();
            }
          } else {
   
     
            try {
   
     
              reconfigFlagClear();
              if (shuttingDownLE) {
   
     
                shuttingDownLE = false;
                startLeaderElection();
              }
              setCurrentVote(makeLEStrategy().lookForLeader());
            } catch (Exception e) {
   
     
              LOG.warn("Unexpected exception", e);
              setPeerState(ServerState.LOOKING);
            }                        
          }
          break;
        case OBSERVING:
          try {
   
     
            LOG.info("OBSERVING");
            setObserver(makeObserver(logFactory));
            observer.observeLeader();
          } catch (Exception e) {
   
     
            LOG.warn("Unexpected exception",e );
          } finally {
   
     
            observer.shutdown();
            setObserver(null);  
            updateServerState();
          }
          break;
        case FOLLOWING:
          try {
   
     
            LOG.info("FOLLOWING");
            setFollower(makeFollower(logFactory));
            follower.followLeader();
          } catch (Exception e) {
   
     
            LOG.warn("Unexpected exception",e);
          } finally {
   
     
            follower.shutdown();
            setFollower(null);
            updateServerState();
          }
          break;
        case LEADING:
          LOG.info("LEADING");
          try {
   
     
            setLeader(makeLeader(logFactory));
            leader.lead();
            setLeader(null);
          } catch (Exception e) {
   
     
            LOG.warn("Unexpected exception",e);
          } finally {
   
     
            if (leader != null) {
   
     
              leader.shutdown("Forcing shutdown");
              setLeader(null);
            }
            updateServerState();
          }
          break;
      }
      start_fle = Time.currentElapsedTime();
    }
  } finally {
   
     
		// 省略
  }
}

下面分别看下不同角色都做了哪些操作

Leader

case LEADING:
  LOG.info("LEADING");
  try {
   
     
    // 1. 创建Leader对象并赋值到leader属性上
    setLeader(makeLeader(logFactory));
    // 2. 运行leader
    leader.lead();
    setLeader(null);
  } catch (Exception e) {
   
     
    LOG.warn("Unexpected exception",e);
  } finally {
   
     
    // 出现异常
    // 3. 关闭leader
    if (leader != null) {
   
     
      leader.shutdown("Forcing shutdown");
      setLeader(null);
    }
    // 4. 更新服务器状态为looking,重新开始选举
    updateServerState();
  }
  break;
}

创建

protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
   
     
  return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
}

// Leader
Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
   
     
  this.self = self;
  this.proposalStats = new BufferStats();
  // 监听本地用来和其他节点交换数据的端口,等待其他节点连接
  // localhost:28888:3888 中用来和其他节点进行交换的端口就是28888
  try {
   
     
    if (self.shouldUsePortUnification() || self.isSslQuorum()) {
   
     
      boolean allowInsecureConnection = self.shouldUsePortUnification();
      if (self.getQuorumListenOnAllIPs()) {
   
     
        ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, self.getQuorumAddress().getPort());
      } else {
   
     
        ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection);
      }
    } else {
   
     
      if (self.getQuorumListenOnAllIPs()) {
   
     
        ss = new ServerSocket(self.getQuorumAddress().getPort());
      } else {
   
     
        ss = new ServerSocket();
      }
    }
    ss.setReuseAddress(true);
    if (!self.getQuorumListenOnAllIPs()) {
   
     
      ss.bind(self.getQuorumAddress());
    }
  } catch (BindException e) {
   
     
    if (self.getQuorumListenOnAllIPs()) {
   
     
      LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e);
    } else {
   
     
      LOG.error("Couldn't bind to " + self.getQuorumAddress(), e);
    }
    throw e;
  }
  this.zk = zk;
  this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(
    maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
}

运行

首先整体看下leader的运行逻辑

void lead() throws IOException, InterruptedException {
   
     
  // 1. 计算选举耗时并打印
  self.end_fle = Time.currentElapsedTime();
  long electionTimeTaken = self.end_fle - self.start_fle;
  self.setElectionTimeTaken(electionTimeTaken);
  LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
           QuorumPeer.FLE_TIME_UNIT);
  self.start_fle = 0;
  self.end_fle = 0;

  zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

  try {
   
     
    self.tick.set(0);
    // 2. 加载数据
    zk.loadData();

    leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

    // Start thread that waits for connection requests from
    // new followers.
    // 3. 开启接收线程,等待其他follower连接
    cnxAcceptor = new LearnerCnxAcceptor();
    cnxAcceptor.start();

    // 4. 等待过半follower连接,生成新的epoch
    long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

    // 5. 设置当前zxid
    zk.setZxid(ZxidUtils.makeZxid(epoch, 0));

    synchronized(this){
   
     
      lastProposed = zk.getZxid();
    }
		
    newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                                                null, null);
    if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
   
     
      LOG.info("NEWLEADER proposal has Zxid of "
               + Long.toHexString(newLeaderProposal.packet.getZxid()));
    }

    QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
    QuorumVerifier curQV = self.getQuorumVerifier();
    if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
   
     
      // This was added in ZOOKEEPER-1783. The initial config has version 0 (not explicitly
      // specified by the user; the lack of version in a config file is interpreted as version=0). 
      // As soon as a config is established we would like to increase its version so that it
      // takes presedence over other initial configs that were not established (such as a config
      // of a server trying to join the ensemble, which may be a partial view of the system, not the full config). 
      // We chose to set the new version to the one of the NEWLEADER message. However, before we can do that
      // there must be agreement on the new version, so we can only change the version when sending/receiving UPTODATE,
      // not when sending/receiving NEWLEADER. In other words, we can't change curQV here since its the committed quorum verifier, 
      // and there's still no agreement on the new version that we'd like to use. Instead, we use 
      // lastSeenQuorumVerifier which is being sent with NEWLEADER message
      // so its a good way to let followers know about the new version. (The original reason for sending 
      // lastSeenQuorumVerifier with NEWLEADER is so that the leader completes any potentially uncommitted reconfigs
      // that it finds before starting to propose operations. Here we're reusing the same code path for 
      // reaching consensus on the new version number.)

      // It is important that this is done before the leader executes waitForEpochAck,
      // so before LearnerHandlers return from their waitForEpochAck
      // hence before they construct the NEWLEADER message containing
      // the last-seen-quorumverifier of the leader, which we change below
      try {
   
     
        QuorumVerifier newQV = self.configFromString(curQV.toString());
        newQV.setVersion(zk.getZxid());
        self.setLastSeenQuorumVerifier(newQV, true);    
      } catch (Exception e) {
   
     
        throw new IOException(e);
      }
    }

    newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
    if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()){
   
     
      newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
    }

    // We have to get at least a majority of servers in sync with
    // us. We do this by waiting for the NEWLEADER packet to get
    // acknowledged
		// 6. 等待过半的投票参与者响应了LEADERINFO信息,然后设置epoch
    waitForEpochAck(self.getId(), leaderStateSummary);
    self.setCurrentEpoch(epoch);    

    try {
   
     
			// 7. 等待过半的投票参与者响应了NEWLEADER
      waitForNewLeaderAck(self.getId(), zk.getZxid());
    } catch (InterruptedException e) {
   
     
      shutdown("Waiting for a quorum of followers, only synced with sids: [ "
               + newLeaderProposal.ackSetsToString() + " ]");
      HashSet<Long> followerSet = new HashSet<Long>();

      for(LearnerHandler f : getLearners()) {
   
     
        if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
   
     
          followerSet.add(f.getSid());
        }
      }    
      boolean initTicksShouldBeIncreased = true;
      for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
   
     
        if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
   
     
          initTicksShouldBeIncreased = false;
          break;
        }
      }                  
      if (initTicksShouldBeIncreased) {
   
     
        LOG.warn("Enough followers present. "+
                 "Perhaps the initTicks need to be increased.");
      }
      return;
    }

    // 8. 启动zk服务器
    startZkServer();

    /**
             * WARNING: do not use this for anything other than QA testing
             * on a real cluster. Specifically to enable verification that quorum
             * can handle the lower 32bit roll-over issue identified in
             * ZOOKEEPER-1277. Without this option it would take a very long
             * time (on order of a month say) to see the 4 billion writes
             * necessary to cause the roll-over to occur.
             *
             * This field allows you to override the zxid of the server. Typically
             * you'll want to set it to something like 0xfffffff0 and then
             * start the quorum, run some operations and see the re-election.
             */
    String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
    if (initialZxid != null) {
   
     
      long zxid = Long.parseLong(initialZxid);
      zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
    }

    if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
   
     
      self.setZooKeeperServer(zk);
    }

    self.adminServer.setZooKeeperServer(zk);

    // Everything is a go, simply start counting the ticks
    // WARNING: I couldn't find any wait statement on a synchronized
    // block that would be notified by this notifyAll() call, so
    // I commented it out
    //synchronized (this) {
   
     
    //    notifyAll();
    //}
    // We ping twice a tick, so we only update the tick every other
    // iteration
    boolean tickSkip = true;
    // If not null then shutdown this leader
    String shutdownMessage = null;
	  // 9. 向其他节点发送心跳
    // 1个tickTime中会发送两次ping
    // 每两次会判断是否有过半的节点仍然和leader保持心跳,如果没有过半,会关闭当前leader
    while (true) {
   
     
      synchronized (this) {
   
     
        long start = Time.currentElapsedTime();
        long cur = start;
        long end = start + self.tickTime / 2;
        while (cur < end) {
   
     
          wait(end - cur);
          cur = Time.currentElapsedTime();
        }

        if (!tickSkip) {
   
     
          self.tick.incrementAndGet();
        }

        // We use an instance of SyncedLearnerTracker to
        // track synced learners to make sure we still have a
        // quorum of current (and potentially next pending) view.
        SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
        syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
        if (self.getLastSeenQuorumVerifier() != null
            && self.getLastSeenQuorumVerifier().getVersion() > self
            .getQuorumVerifier().getVersion()) {
   
     
          syncedAckSet.addQuorumVerifier(self
                                         .getLastSeenQuorumVerifier());
        }

        syncedAckSet.addAck(self.getId());

        for (LearnerHandler f : getLearners()) {
   
     
          if (f.synced()) {
   
     
            syncedAckSet.addAck(f.getSid());
          }
        }

        // check leader running status
        if (!this.isRunning()) {
   
     
          // set shutdown flag
          shutdownMessage = "Unexpected internal error";
          break;
        }

        if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
   
     
          // Lost quorum of last committed and/or last proposed
          // config, set shutdown flag
          shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
            + syncedAckSet.ackSetsToString() + " ]";
          break;
        }
        tickSkip = !tickSkip;
      }
      for (LearnerHandler f : getLearners()) {
   
     
        f.ping();
      }
    }
    if (shutdownMessage != null) {
   
     
      shutdown(shutdownMessage);
      // leader goes in looking state
    }
  } finally {
   
     
    zk.unregisterJMX(this);
  }
}

下面看下创建的LearnerCnxAcceptor做了什么

// LearnerCnxAcceptor
@Override
public void run() {
   
     
  try {
   
     
    while (!stop) {
   
     
      Socket s = null;
      boolean error = false;
      try {
   
     
        // 1. 接收连接
        s = ss.accept();

        // start with the initLimit, once the ack is processed
        // in LearnerHandler switch to the syncLimit
        // 2. 设置连接
        // 设置read操作阻塞的超时时间,如果在指定的时间内一直没有数据可以读取,会抛出异常
        s.setSoTimeout(self.tickTime * self.initLimit);
        s.setTcpNoDelay(nodelay);

        BufferedInputStream is = new BufferedInputStream(
          s.getInputStream());
        // 3. 创建LearnerHandler并启动
        LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
        fh.start();
      } catch (SocketException e) {
   
     
        error = true;
        if (stop) {
   
     
          LOG.info("exception while shutting down acceptor: "
                   + e);

          // When Leader.shutdown() calls ss.close(),
          // the call to accept throws an exception.
          // We catch and set stop to true.
          stop = true;
        } else {
   
     
          throw e;
        }
      } catch (SaslException e){
   
     
        LOG.error("Exception while connecting to quorum learner", e);
        error = true;
      } catch (Exception e) {
   
     
        error = true;
        throw e;
      } finally {
   
     
        // Don't leak sockets on errors
        if (error && s != null && !s.isClosed()) {
   
     
          try {
   
     
            s.close();
          } catch (IOException e) {
   
     
            LOG.warn("Error closing socket", e);
          }
        }
      }
    }
  } catch (Exception e) {
   
     
    LOG.warn("Exception while accepting follower", e.getMessage());
    handleException(this.getName(), e);
  }
}

可以看到LearnerCnxAcceptor做的主要就是监听端口,每接收一个连接请求,就创建一个LearnerHandler来处理这个新建立的连接

下面综合Leader和LearnerHandle来看leader和其他节点之间的通信同步过程

获取对端节点信息

从对端节点的输入流中读取数据,然后解析数据,得到对端节点的id,version,configVersion,lastAcceptedEpoch等信息

// LearnerHandler
public void run() {
   
     
	leader.addLearnerHandler(this);
  tickOfNextAckDeadline = leader.self.tick.get()
    + leader.self.initLimit + leader.self.syncLimit;

  ia = BinaryInputArchive.getArchive(bufferedInput);
  bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
  oa = BinaryOutputArchive.getArchive(bufferedOutput);

  QuorumPacket qp = new QuorumPacket();
  // 从对端读取数据
  ia.readRecord(qp, "packet");
  if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
   
     
    LOG.error("First packet " + qp.toString()
              + " is not FOLLOWERINFO or OBSERVERINFO!");
    return;
  }

  // 解析对端节点的信息
  byte learnerInfoData[] = qp.getData();
  if (learnerInfoData != null) {
   
     
    ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
    // sid
    if (learnerInfoData.length >= 8) {
   
     
      this.sid = bbsid.getLong();
    }
    // version
    if (learnerInfoData.length >= 12) {
   
     
      this.version = bbsid.getInt(); // protocolVersion
    }
    // configVersion
    if (learnerInfoData.length >= 20) {
   
     
      long configVersion = bbsid.getLong();
      if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
   
     
        throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
      }
    }
  } else {
   
     
    this.sid = leader.followerCounter.getAndDecrement();
  }

  if (leader.self.getView().containsKey(this.sid)) {
   
     
    LOG.info("Follower sid: " + this.sid + " : info : "
             + leader.self.getView().get(this.sid).toString());
  } else {
   
     
    LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
  }

  // 节点角色
  if (qp.getType() == Leader.OBSERVERINFO) {
   
     
    learnerType = LearnerType.OBSERVER;
  }

  // 最新的epoch
  long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());  
  
  // 省略
}

生成当前epoch

// Leader
void lead() {
   
     
  long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
}

// LearnerHandler
public void run() {
   
     
  long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
  long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
}

getEpochToPropose主要作用是等待过半learner和当前的leader建立了连接,通过每个learner发给leader消息中的epoch,来计算最新的epoch

// 已经和当前leader建立连接的follower集合
protected final Set<Long> connectingFollowers = new HashSet<Long>();
// Leader
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
   
     
  // 并发操作connectingFollowers
  synchronized(connectingFollowers) {
   
     
    // 如果已经计算过新的epoch,直接返回
    if (!waitingForNewEpoch) {
   
     
      return epoch;
    }
    // 接收到的epoch大于当前的epoch,更新
    if (lastAcceptedEpoch >= epoch) {
   
     
      epoch = lastAcceptedEpoch+1;
    }
    // 当前节点是选举参与者
    // 将该节点加入到并发操作connectingFollowers集合中,代表leader已经和其建立了连接
    if (isParticipant(sid)) {
   
     
      connectingFollowers.add(sid);
    }
    QuorumVerifier verifier = self.getQuorumVerifier();
    // 判断leader是否已经和过半的参与者建立了连接
    if (connectingFollowers.contains(self.getId()) &&
        verifier.containsQuorum(connectingFollowers)) {
   
     
      // 修改标志,新的epoch已经计算完毕
      waitingForNewEpoch = false;
      // 设置epoch
      self.setAcceptedEpoch(epoch);
      // 唤醒所有阻塞线程
      connectingFollowers.notifyAll();
    } else {
   
     
      long start = Time.currentElapsedTime();
      long cur = start;
      long end = start + self.getInitLimit()*self.getTickTime();
      // 阻塞等待过半follower连接
      while(waitingForNewEpoch && cur < end) {
   
     
        connectingFollowers.wait(end - cur);
        cur = Time.currentElapsedTime();
      }
      if (waitingForNewEpoch) {
   
     
        throw new InterruptedException("Timeout while waiting for epoch from quorum");
      }
    }
    return epoch;
  }
}

从上面的代码中可以看出该方法的过程如下:

1、 判断和leader建立连接的对端节点是否是参与者,如果是参与者,将其加入到connectingFollowers中;
2、 使用对端节点的epoch来计算leader的最新epoch,保证leader的最新epoch为所有接收到的epoch中的最大值+1;
3、 如果过半的参与者已经和leader建立了连接,那么此时epoch更新完毕,唤醒所有阻塞在计算最新epoch的线程,继续往下执行;
4、 如果没有过半的参与者和leader建立连接,那么会阻塞,并且等待过半的参与者和leader建立连接;

确认生成的epoch

// Leader
void lead() {
   
     
  // 等待过半参与者对生成的epoch ack
	waitForEpochAck(self.getId(), leaderStateSummary);
  // 设置当前的epoch
	self.setCurrentEpoch(epoch);      
}

// LearnerHandler
public void run() {
   
     
	byte ver[] = new byte[4];
  ByteBuffer.wrap(ver).putInt(0x10000);
  // 向对端节点发送LEADERINFO,该信息带有生成的epoch
  QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
  oa.writeRecord(newEpochPacket, "packet");
  bufferedOutput.flush();
  // 读取对端节点输入流,获取对端节点对生成的epoch的响应
  QuorumPacket ackEpochPacket = new QuorumPacket();
  ia.readRecord(ackEpochPacket, "packet");
  if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
   
     
    LOG.error(ackEpochPacket.toString()
              + " is not ACKEPOCH");
    return;
  }
  ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
  ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
  // 等待半数参与者对生成epoch
  leader.waitForEpochAck(this.getSid(), ss);  
}

当leader向每个节点发送了带有最新epoch的LEADERINFO消息后,会等待过半learner发送对该epoch的ack消息

基本流程和getEpochToPropose类似

public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
   
     
  synchronized(electingFollowers) {
   
     
    if (electionFinished) {
   
     
      return;
    }
    if (ss.getCurrentEpoch() != -1) {
   
     
      if (ss.isMoreRecentThan(leaderStateSummary)) {
   
     
        throw new IOException("Follower is ahead of the leader, leader summary: " 
                              + leaderStateSummary.getCurrentEpoch()
                              + " (current epoch), "
                              + leaderStateSummary.getLastZxid()
                              + " (last zxid)");
      }
      if (isParticipant(id)) {
   
     
        electingFollowers.add(id);
      }
    }
    QuorumVerifier verifier = self.getQuorumVerifier();
    if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) {
   
     
      electionFinished = true;
      electingFollowers.notifyAll();
    } else {
   
     
      long start = Time.currentElapsedTime();
      long cur = start;
      long end = start + self.getInitLimit()*self.getTickTime();
      while(!electionFinished && cur < end) {
   
     
        electingFollowers.wait(end - cur);
        cur = Time.currentElapsedTime();
      }
      if (!electionFinished) {
   
     
        throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum");
      }
    }
  }
}

同步数据

todo


boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
            
/* if we are not truncating or sending a diff just send a snapshot */
if (needSnap) {
   
     
  boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
  LearnerSnapshot snapshot = 
    leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
  try {
   
     
    long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
    bufferedOutput.flush();

    LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
             + "send zxid of db as 0x{}, {} concurrent snapshots, " 
             + "snapshot was {} from throttle",
             Long.toHexString(peerLastZxid), 
             Long.toHexString(leaderLastZxid),
             Long.toHexString(zxidToSend), 
             snapshot.getConcurrentSnapshotNumber(),
             snapshot.isEssential() ? "exempt" : "not exempt");
    // Dump data to peer
    leader.zk.getZKDatabase().serializeSnapshot(oa);
    oa.writeString("BenWasHere", "signature");
    bufferedOutput.flush();
  } finally {
   
     
    snapshot.close();
  }
}

确认新leader

// LearnerHandler
public void run() {
   
     
  // 生成NEWLEADER信息并发送给对端
  QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                        newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
                                .toString().getBytes(), null);
  queuedPackets.add(newLeaderQP);
  bufferedOutput.flush();

  // Start thread that blast packets in the queue to learner
  startSendingPackets();
  
  // 从对端输入流中读取数据
  qp = new QuorumPacket();
  ia.readRecord(qp, "packet");
  if(qp.getType() != Leader.ACK){
   
     
    LOG.error("Next packet was supposed to be an ACK,"
              + " but received packet: {}", packetToString(qp));
    return;
  }

  if(LOG.isDebugEnabled()){
   
     
    LOG.debug("Received NEWLEADER-ACK message from " + sid);   
  }
  // 等待过半参与者对新leader的确认
  leader.waitForNewLeaderAck(getSid(), qp.getZxid());
}

void lead() {
   
     
  waitForNewLeaderAck(self.getId(), zk.getZxid());
}

当leader向其他learner发送了NEWLEADER消息后,会判断是否有过半的节点发送了对新领导者的确认信息

public void waitForNewLeaderAck(long sid, long zxid)
            throws InterruptedException {
   
     

  synchronized (newLeaderProposal.qvAcksetPairs) {
   
     

    if (quorumFormed) {
   
     
      return;
    }

    long currentZxid = newLeaderProposal.packet.getZxid();
    if (zxid != currentZxid) {
   
     
      LOG.error("NEWLEADER ACK from sid: " + sid
                + " is from a different epoch - current 0x"
                + Long.toHexString(currentZxid) + " receieved 0x"
                + Long.toHexString(zxid));
      return;
    }

    /*
             * Note that addAck already checks that the learner
             * is a PARTICIPANT.
             */
    newLeaderProposal.addAck(sid);

    if (newLeaderProposal.hasAllQuorums()) {
   
     
      quorumFormed = true;
      newLeaderProposal.qvAcksetPairs.notifyAll();
    } else {
   
     
      long start = Time.currentElapsedTime();
      long cur = start;
      long end = start + self.getInitLimit() * self.getTickTime();
      while (!quorumFormed && cur < end) {
   
     
        newLeaderProposal.qvAcksetPairs.wait(end - cur);
        cur = Time.currentElapsedTime();
      }
      if (!quorumFormed) {
   
     
        throw new InterruptedException(
          "Timeout while waiting for NEWLEADER to be acked by quorum");
      }
    }
  }
}

启动ZookeeperServer

等待Leader启动ZookeeperServe

// Leader
void lead() {
   
     
  startZkServer();
}

// LearnerHandler
public void run() {
   
     
  synchronized(leader.zk){
   
     
    while(!leader.zk.isRunning() && !this.isInterrupted()){
   
     
      leader.zk.wait(20);
    }
  }
}

心跳

// Leader
void lead() {
   
     
  // 一个tickTime中Leader会向其他节点发送两次ping信息
  // 因此每两次循环会执行过半检查和递增时钟操作
  while (true) {
   
     
    synchronized (this) {
   
     
      long start = Time.currentElapsedTime();
      long cur = start;
      // 半个tickTime
      long end = start + self.tickTime / 2;
      while (cur < end) {
   
     
        wait(end - cur);
        cur = Time.currentElapsedTime();
      }

      // 每两次循环递增时钟
      if (!tickSkip) {
   
     
        self.tick.incrementAndGet();
      }

      // We use an instance of SyncedLearnerTracker to
      // track synced learners to make sure we still have a
      // quorum of current (and potentially next pending) view.
      SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
      syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
      if (self.getLastSeenQuorumVerifier() != null
          && self.getLastSeenQuorumVerifier().getVersion() > self
          .getQuorumVerifier().getVersion()) {
   
     
        syncedAckSet.addQuorumVerifier(self
                                       .getLastSeenQuorumVerifier());
      }

      syncedAckSet.addAck(self.getId());

      for (LearnerHandler f : getLearners()) {
   
     
        // 判断当前时钟是否在tickOfNextAckDeadline之前,即最近接收过对端节点的信息
        if (f.synced()) {
   
     
          syncedAckSet.addAck(f.getSid());
        }
      }

      // check leader running status
      if (!this.isRunning()) {
   
     
        // set shutdown flag
        shutdownMessage = "Unexpected internal error";
        break;
      }

      // 每两次循环,判断一下是否有过半节点仍然和当前节点保持着心跳
      if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
   
     
        // Lost quorum of last committed and/or last proposed
        // config, set shutdown flag
        shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
          + syncedAckSet.ackSetsToString() + " ]";
        break;
      }
      tickSkip = !tickSkip;
    }
    // 发送ping信息
    for (LearnerHandler f : getLearners()) {
   
     
      f.ping();
    }
  }
  // 当没有过半节点和leader保持心跳时,会关闭leader,进入looking,重新开始选举
  if (shutdownMessage != null) {
   
     
    shutdown(shutdownMessage);
    // leader goes in looking state
  }  
}
// LearnerHandler
public void ping() {
   
     
  // If learner hasn't sync properly yet, don't send ping packet
  // otherwise, the learner will crash
  if (!sendingThreadStarted) {
   
     
    return;
  }
  long id;
  if (syncLimitCheck.check(System.nanoTime())) {
   
     
    synchronized(leader) {
   
     
      id = leader.lastProposed;
    }
    QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
    queuePacket(ping);
  } else {
   
     
    LOG.warn("Closing connection to peer due to transaction timeout.");
    shutdown();
  }
}

// SyncLimitCheck
public synchronized boolean check(long time) {
   
     
  if (currentTime == 0) {
   
     
    return true;
  } else {
   
     
    long msDelay = (time - currentTime) / 1000000;
    return (msDelay < (leader.self.tickTime * leader.self.syncLimit));
  }
}

Follower

// QuorumPeer
public void run() {
   
     
  case FOLLOWING:
    try {
   
     
      LOG.info("FOLLOWING");
      setFollower(makeFollower(logFactory));
      follower.followLeader();
    } catch (Exception e) {
   
     
      LOG.warn("Unexpected exception",e);
    } finally {
   
     
      follower.shutdown();
      setFollower(null);
      updateServerState();
    }
    break;  
}

创建

// QuorumPeer
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
   
     
  return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
}
Follower(QuorumPeer self,FollowerZooKeeperServer zk) {
   
     
  this.self = self;
  this.zk=zk;
  this.fzk = zk;
}

运行

// Follower
void followLeader() throws InterruptedException {
   
     
  self.end_fle = Time.currentElapsedTime();
  long electionTimeTaken = self.end_fle - self.start_fle;
  self.setElectionTimeTaken(electionTimeTaken);
  LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
           QuorumPeer.FLE_TIME_UNIT);
  self.start_fle = 0;
  self.end_fle = 0;
  fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
  try {
   
     
    // 1. 通过投票结果获取当前leader的信息
    QuorumServer leaderServer = findLeader();            
    try {
   
     
      // 2. 连接leader
      connectToLeader(leaderServer.addr, leaderServer.hostname);
      // 3. 注册到leader
      long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
      if (self.isReconfigStateChange())
        throw new Exception("learned about role change");
      //check to see if the leader zxid is lower than ours
      //this should never happen but is just a safety check
      long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
      if (newEpoch < self.getAcceptedEpoch()) {
   
     
        LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                  + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
        throw new IOException("Error: Epoch of leader is lower");
      }
      // 4. 同步leader
      syncWithLeader(newEpochZxid);                
      QuorumPacket qp = new QuorumPacket();
      while (this.isRunning()) {
   
     
       	// 从leader输入流中读取数据
        readPacket(qp);
        // 处理读取到的数据
        processPacket(qp);
      }
    } catch (Exception e) {
   
     
      LOG.warn("Exception when following the leader", e);
      try {
   
     
        sock.close();
      } catch (IOException e1) {
   
     
        e1.printStackTrace();
      }

      // clear pending revalidations
      pendingRevalidations.clear();
    }
  } finally {
   
     
    zk.unregisterJMX((Learner)this);
  }
}

连接leader

// Follower
protected void connectToLeader(InetSocketAddress addr, String hostname)
            throws IOException, InterruptedException, X509Exception {
   
     
  // 创建socket连接
  this.sock = createSocket();
	
  // 建立连接的最长时间
  int initLimitTime = self.tickTime * self.initLimit;
  int remainingInitLimitTime = initLimitTime;
  long startNanoTime = nanoTime();

  // 重试连接
  // 如果重试5次失败或者在initLimitTime内没有建立连接,连接失败
  for (int tries = 0; tries < 5; tries++) {
   
     
    try {
   
     
      // recalculate the init limit time because retries sleep for 1000 milliseconds
      remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
      if (remainingInitLimitTime <= 0) {
   
     
        LOG.error("initLimit exceeded on retries.");
        throw new IOException("initLimit exceeded on retries.");
      }

      // 连接leader
      sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));
      if (self.isSslQuorum())  {
   
     
        ((SSLSocket) sock).startHandshake();
      }
      sock.setTcpNoDelay(nodelay);
      break;
    } catch (IOException e) {
   
     
      // 连接失败,更新剩余时间
      remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);

      if (remainingInitLimitTime <= 1000) {
   
     
        LOG.error("Unexpected exception, initLimit exceeded. tries=" + tries +
                  ", remaining init limit=" + remainingInitLimitTime +
                  ", connecting to " + addr,e);
        throw e;
      } else if (tries >= 4) {
   
     
        LOG.error("Unexpected exception, retries exceeded. tries=" + tries +
                  ", remaining init limit=" + remainingInitLimitTime +
                  ", connecting to " + addr,e);
        throw e;
      } else {
   
     
        LOG.warn("Unexpected exception, tries=" + tries +
                 ", remaining init limit=" + remainingInitLimitTime +
                 ", connecting to " + addr,e);
        this.sock = createSocket();
      }
    }
    Thread.sleep(1000);
  }

  self.authLearner.authenticate(sock, hostname);

  // 建立和leader的输入和输出流
  leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
    sock.getInputStream()));
  bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
  leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}

注册信息

protected long registerWithLeader(int pktType) throws IOException{
   
     
  // 1. 生成当前节点信息,id,协议版本,config版本
  /*
         * Send follower info, including last zxid and sid
         */
  long lastLoggedZxid = self.getLastLoggedZxid();
  QuorumPacket qp = new QuorumPacket();                
  qp.setType(pktType);
  qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));

  /*
         * Add sid to payload
         */
  LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
  ByteArrayOutputStream bsid = new ByteArrayOutputStream();
  BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
  boa.writeRecord(li, "LearnerInfo");
  qp.setData(bsid.toByteArray());
  // 2. 发送给leader 
  writePacket(qp, true);
  // 3. 接收leader发来的epoch通知请求
  readPacket(qp);        
  // 从接收数据中解析出最新的epoch
  final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
  if (qp.getType() == Leader.LEADERINFO) {
   
     
    // we are connected to a 1.0 server so accept the new epoch and read the next packet
    leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
    byte epochBytes[] = new byte[4];
    final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
   	// 使用新的epoch设置当前epoch
    if (newEpoch > self.getAcceptedEpoch()) {
   
     
      wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
      self.setAcceptedEpoch(newEpoch);
    } else if (newEpoch == self.getAcceptedEpoch()) {
   
     
      // since we have already acked an epoch equal to the leaders, we cannot ack
      // again, but we still need to send our lastZxid to the leader so that we can
      // sync with it if it does assume leadership of the epoch.
      // the -1 indicates that this reply should not count as an ack for the new epoch
      wrappedEpochBytes.putInt(-1);
    } else {
   
     
      throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
    }
    
		// 4. 发送对epoch的响应信息
    QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
    writePacket(ackNewEpoch, true);
    return ZxidUtils.makeZxid(newEpoch, 0);
  } else {
   
     
    if (newEpoch > self.getAcceptedEpoch()) {
   
     
      self.setAcceptedEpoch(newEpoch);
    }
    if (qp.getType() != Leader.NEWLEADER) {
   
     
      LOG.error("First packet should have been NEWLEADER");
      throw new IOException("First packet should have been NEWLEADER");
    }
    return qp.getZxid();
  }
} 

同步数据

todo

循环处理请求

// Follower
public void followLeader() {
   
     
	QuorumPacket qp = new QuorumPacket();
  while (this.isRunning()) {
   
     
    readPacket(qp);
    processPacket(qp);
  }  
}

版权声明:「DDKK.COM 弟弟快看,程序员编程资料站」本站文章,版权归原作者所有