synchronized public void startLeaderElection() {
try {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
for (QuorumServer p : getView().values()) {
if (p.id == myid) {
myQuorumAddr = p.addr;
break;
}
}
if (myQuorumAddr == null) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
this.electionAlg = createElectionAlgorithm(electionType);
}
有 几种类型electionType 选举 算法,默认electionType=3,即快速选举fast paxos算法
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = new QuorumCnxManager(this);
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
electionType=0 就是普通的paxos选举,该选举算法采用udp,开辟了一个ResponsderThread线程:
@Deprecated
class ResponderThread extends Thread {
ResponderThread() {
super("ResponderThread");
}
volatile boolean running = true;
@Override
public void run() {
try {
byte b[] = new byte[36];
ByteBuffer responseBuffer = ByteBuffer.wrap(b);
DatagramPacket packet = new DatagramPacket(b, b.length);
while (running) {
udpSocket.receive(packet);
if (packet.getLength() != 4) {
LOG.warn("Got more than just an xid! Len = "
+ packet.getLength());
} else {
responseBuffer.clear();
responseBuffer.getInt(); // Skip the xid
responseBuffer.putLong(myid);
Vote current = getCurrentVote();
switch (getPeerState()) {
case LOOKING:
responseBuffer.putLong(current.getId());
responseBuffer.putLong(current.getZxid());
break;
case LEADING:
responseBuffer.putLong(myid);
try {
long proposed;
synchronized(leader) {
proposed = leader.lastProposed;
}
responseBuffer.putLong(proposed);
} catch (NullPointerException npe) {
// This can happen in state transitions,
// just ignore the request
}
break;
case FOLLOWING:
responseBuffer.putLong(current.getId());
try {
responseBuffer.putLong(follower.getZxid());
} catch (NullPointerException npe) {
// This can happen in state transitions,
// just ignore the request
}
break;
case OBSERVING:
// Do nothing, Observers keep themselves to
// themselves.
break;
}
packet.setData(b);
udpSocket.send(packet);
}
packet.setLength(b.length);
}
} catch (RuntimeException e) {
LOG.warn("Unexpected runtime exception in ResponderThread",e);
} catch (IOException e) {
LOG.warn("Unexpected IO exception in ResponderThread",e);
} finally {
LOG.warn("QuorumPeer responder thread exited");
}
}
}
可以看到@Deprecated,所以说LeaderElection 方式将会被弃用,只采用fast paxos方法。
responsder线程会阻塞在receive处,等待其他server投回的结果,我们同时看看QuorumPeer线程的逻辑:
public void run() {
setName("QuorumPeer" + "[myid=" + getId() + "]" +
cnxnFactory.getLocalAddress());
LOG.debug("Starting quorum peer");
try {
jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for(QuorumServer s: getView().values()){
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else {
p = new RemotePeerBean(s);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxQuorumBean = null;
}
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
logFactory, this,
new ZooKeeperServer.BasicDataTreeBuilder(),
this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
setPeerState(ServerState.LOOKING);
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
}
}
} finally {
LOG.warn("QuorumPeer main thread exited");
try {
MBeanRegistry.getInstance().unregisterAll();
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
jmxQuorumBean = null;
jmxLocalPeerBean = null;
}
}
QuorumPeer线程 发现自己处于LOOKING状态,就向所有server包括自己发送投票(addr -> electionAddr 发。 反向回投票结果),内容是xid 一个随机的4字节整数, 然后等待结果。 所有server包括自身的responder线程这时候会收到这个xid,并回送这个xid(用于给QuornumPeer线程辨别是否自己发出的),以及myid,currentId, currentZxid,(启动过程currentId,其实就是自身myid,即首先推举自己)。
然后统计countVotes所有投票结果: 首先过滤无效的投票,即要求被投的id,也在heardFrom里(被投的server,自己也参与了投票,否则算没被投);找出被个被投的id所拥有的最大zxid;投给同一个id的投票中,如果某投票的zxid小于那个zxid(投同一个id的投票中最大的zxid),则zxid设成那个最大的(Make all zxids for a given vote id equal to the largest zxid seen for that id); 统计出投各个id的票数,以及拥有最大zxid的被投id的票数; 返回的result,vote和count成员是指向最大zxid的投票,而winner和winningcount指向的是投票数最多的那个; 这就是countVotes函数。
countVotes函数返回后,如果winningCout 过半,则设置currentVote为winner(并设置自己为leading或者following等状态),否则设置为vote(最大zxid那个),继续投。