ZooKeeper源码(二)---ZooKeeper流程
一、选主流程zookeeper核心机制包括:恢复模式(选主流程)和广播模式(同步流程)。当服务刚启动、leader崩溃、follower不足半数时,系统就进入选主流程,此时不对外提供服务;当leader被选举出来后,系统就进入同步流程,server之间完成状态同步,此后对外提供服务。
选举策略主要基于paxos算法,一种称为LeaderElection算法,一种称为FashLeaderElection算法,系统默认使用FashLeaderElection算法完成。
1.1 LeaderElection算法
以下源码都在Leader类里
流程简述如下:
(1)进入选主流程,当前发起选举的线程(选举线程)负责对投票结果进行统计,以及决定推荐哪个server作为leader;
(2)选举线程初始化currentVote,备选leader id默认是自己,然后向所有server发起一次询问,包括自己;Vote类实际上可看成(id, zxid)这样的二元组,其中id表示备 选leader的id,zxid表示当前最新事务id。
[*]
protected QuorumPeer self;
[*]
self.setCurrentVote(new Vote(self.getId(),self.getLastLoggedZxid()));
[*]
……
[*]
int xid = epochGen.nextInt();
[*]
while (self.isRunning()) {
[*]
HashMap<InetSocketAddress, Vote> votes =new HashMap<InetSocketAddress, Vote>(self.getVotingView().size());
(3) 选举线程收到回复后,验证是否自己发起的询问,然后获取以下信息:
① 第一,获取回复server的id,存储到询问对象列表heardFrom中;
② 第二,获取回复server提议的备选leader信息(id,zxid),其中id是备选leader的id,zxid是事务id号,存储到投票记录表votes中。
[*]
s.send(requestPacket);
[*]
responsePacket.setLength(responseBytes.length);
[*]
s.receive(responsePacket);
[*]
……
[*]
int recvedXid = responseBuffer.getInt();
[*]
……
[*]
long peerId = responseBuffer.getLong();
[*]
heardFrom.add(peerId);
[*]
Vote vote = new Vote(responseBuffer.getLong(),
[*]
responseBuffer.getLong());
[*]
InetSocketAddress addr = (InetSocketAddress) responsePacket
[*]
.getSocketAddress();
[*]
votes.put(addr, vote);
(4)所有server遍历请求一次后,选举线程进行投票记录计算:countVote()。每次计算需要统计两个值:result.vote保存zxid最大的投票记录,result.count是对应得票数;result.winner记录得票数最多的投票记录,result.winningCount是对应得票数。
[*]
result.vote = new Vote(Long.MIN_VALUE, Long.MIN_VALUE);
[*]
result.winner = new Vote(Long.MIN_VALUE, Long.MIN_VALUE);
[*]
Collection<Vote> votesCast = votes.values();
[*]
HashMap<Vote, Integer> countTable = new HashMap<Vote, Integer>();
[*]
// Now do the tally
[*]
for (Vote v : votesCast){
[*]
Integer count = countTable.get(v);
[*]
if (count == null) {
[*]
count = Integer.valueOf(0);
[*]
}
[*]
countTable.put(v, count + 1);
[*]
……
[*]
}
[*]
result.winningCount = 0;
[*]
LOG.info("Election tally: ");
[*]
for (Entry<Vote, Integer> entry : countTable.entrySet()) {
[*]
if (entry.getValue() > result.winningCount) {
[*]
result.winningCount = entry.getValue();
[*]
result.winner = entry.getKey();
[*]
}
[*]
LOG.info(entry.getKey().id + "\t-> " + entry.getValue());
[*]
}
[*]
return result;
(5)对统计结果进行判断,如果result.winner对应的winningCount> n/2+1,即获得了n/2+1的投票数,表明该备选leader获胜,将根据获胜leader的相关信息设置自己的状态;否则将currentVote设置为result.vote,即选择zxid最大的那个server作为备选leader,然后重复整个过程,直到leader被选举出来。
[*]
if (votes.size() == 0) {
[*]
self.setCurrentVote(new Vote(self.getId(), self.getLastLoggedZxid()));
[*]
} else {
[*]
if (result.winner.id >= 0) {
[*]
self.setCurrentVote(result.vote);
[*]
// To do: this doesn't use a quorum verifier
[*]
if (result.winningCount > (self.getVotingView().size() / 2)) {
[*]
self.setCurrentVote(result.winner);
[*]
s.close();
[*]
Vote current = self.getCurrentVote();
[*]
LOG.info("Found leader: my type is: " + self.getLearnerType());
[*]
/* * We want to make sure we implement the state machine
[*]
* correctly. If we are a PARTICIPANT, once a leader
[*]
* is elected we can move either to LEADING or
[*]
* FOLLOWING. However if we are an OBSERVER, it is an
[*]
* error to be elected as a Leader.
[*]
*/
[*]
if (self.getLearnerType() == LearnerType.OBSERVER) {
[*]
if (current.id == self.getId()) {
[*]
// This should never happen!
[*]
LOG.error("OBSERVER elected as leader!");
[*]
Thread.sleep(100);
[*]
} else {
[*]
self.setPeerState(ServerState.OBSERVING);
[*]
Thread.sleep(100);
[*]
return current;
[*]
}
[*]
} else {
[*]
self.setPeerState((current.id == self.getId())
[*]
? ServerState.LEADING: ServerState.FOLLOWING);
[*]
if (self.getPeerState() == ServerState.FOLLOWING) {
[*]
Thread.sleep(100);
[*]
}
[*]
return current;
[*]
}
[*]
}
[*]
}
[*]
}
另外,从流程中可以分析得知:要使leader获得多数server的支持,server的总数必须是2n+1的,且存活的server数目不能少于n+1,否则选主流程无法成功,server将会一直处于该LOOKING状态。
如果某server成为leader,那么一定拥有最大的zxid,也就是数据最新。此时会触发leader向其他follower状态同步的过程,保证所有server的数据一致。该恢复过程完毕后,才能重新接收client的请求。
observer节点同样参与发送request和接收response的过程,但是ResponderThread的run()方法中并不做任何处理。也就是说,observer节点需要确定leader节点,但不参与选举。
1.2 FashLeaderElection算法
流程简述如下:
(1)选举线程将epoch+1,向所有server提议自己变成leader,广播自身(id,zxid)信息。
(2)其他server收到提议后,需要解决epoch和zxid的冲突,然后向发起提议的server回复信息。(Q1:如何解决冲突?回复的是什么?)
(3) 选举线程接收到其他server的回复,先判断自身epoch是否合法,然后更新epoch为对方推荐的zxid、epoch,并向对方发送ACK消息,然后继续等待。等待一个周期结束后,如果仍未能选出leader,则再次发送自己推荐的leader信息;否则可以结束。(Q2:过程不明?)
(4) 重复该过程,最后一定能选举出leader。(Q3:为什么?)
二、状态同步
状态同步实际上就是选举完成后,leader向follower同步数据的恢复过程,具体的流程分析可以按照不同角色进行阐述。
2.1 LEADER流程
(1) leader设置最新的epoch,即zxid的高32位++;
(2) leader构建NEWLEADER包,该包的数据是当前最大的事务id,然后广播给所有的follower告知 leader保存的事务id是多少,从而判断是否需要同步数据;
(3) leader会给每个follower创建一个线程LearnerHandler,负责接收它们的同步数据请求;
(4) leader主线程阻塞等待其他follower的回应,只有在超过半数的follower已经同步数据完毕,该过程才能结束,leader才能正式成为leader。
2.2 FOLLOWER流程
follower的同步逻辑,主要是followLeader()方法:
(1)findLeader()方法返回leader地址,然后connectToLeader()连接到leader,有重试机制,超时未连接上则退回到LOOKING状态;
(2)生成的leaderIs、leaderOs就是对应的Archive。
(3) registerLeader()方法向leader注册,需要传入pktType和sentLastZxid参数,前者是packet类型,后者是该follower最新的zxid;
(4)方法返回leader反馈的newLeaderZxid,同时leader开始向follower发送需要同步的信息,同步哪些信息是根据sentLastZxid和newLeaderZxid来确定的。
(5) 进行syncWithLeader()过程,与leader数据同步。该过程会循环收到不同类型的packet,只有当收到Leader.UPTODATE类型的packet时,才表示同步结束。
(6)循环执行processPacket(),直到服务停止为止。processPacket()主要是根据leader发送过来的packet类型进行不同处理。
① Leader.PING:执行ping()。
② Leader.SYNC:执行FollowerZookeeperServer的sync()。
③ Leader.REVALIDATE:执行revalidate()。
④ Leader.UPTODATE:不可能,报错。
⑤ Leader.COMMIT:执行FollowerZookeeperServer的commit()。
⑥ Leader.PROPOSAL:执行FollowerZookeeperServer的logRequest()。
2.3 OBSERVER流程
Observer的逻辑所在,主要是observeLeader()方法:
① 与Follower同。
② 与Follower同,不过传入的pktType不同。
③ 与Follower同。
④ processPacket()方法的处理逻辑与Follower不同,主要是:Leader.PROPOSAL、Leader.COMMIT被忽略了。
三、广播流程
经典的两阶段提交,即leader提起一个决议,由followers进行投票,leader对投票结果进行计算决定是否通过该决议,如果通过执行该决议(事务),否则什么也不做。
页:
[1]