一、选主流程
zookeeper核心机制包括:恢复模式(选主流程 )和广播模式(同步流程 )。当服务刚启动、leader崩溃、follower不足半数时,系统就进入选主流程,此时不对外提供服务;当leader被选举出来后,系统就进入同步流程,server之间完成状态同步,此后对外提供服务。
选举策略主要基于paxos算法,一种称为LeaderElection算法,一种称为FashLeaderElection算法,系统默认使用FashLeaderElection算法完成。
1.1 LeaderElection算法
以下源码都在Leader类里
流程简述如下:
(1) 进入选主流程,当前发起选举的线程(选举线程)负责对投票结果进行统计,以及决定推荐哪个server作为leader;
(2) 选举线程初始化currentVote,备选leader id默认是自己,然后向所有server发起一次询问,包括自己;Vote类实际上可看成(id, zxid)这样的二元组,其中id表示备 选leader的id,zxid表示当前最新事务id。
protected QuorumPeer self;
self.setCurrentVote( new Vote(self.getId(),self.getLastLoggedZxid())) ;
……
int xid = epochGen.nextInt();
while (self.isRunning()) {
HashMap<InetSocketAddress, Vote> votes =new HashMap<InetSocketAddress, Vote>(self.getVotingView().size());
(3) 选举线程收到回复后,验证 是否自己发起的询问,然后获取 以下信息:
① 第一,获取回复server的id,存储到询问对象列表heardFrom中;
② 第二,获取回复server提议的备选leader信息(id,zxid),其中id是备选leader的id,zxid是事务id号,存储到投票记录表votes中。
s.send(requestPacket);
responsePacket.setLength(responseBytes.length);
s.receive(responsePacket);
……
int recvedXid = responseBuffer.getInt();
……
long peerId = responseBuffer.getLong();
heardFrom.add(peerId);
Vote vote = new Vote(responseBuffer.getLong(),
responseBuffer.getLong());
InetSocketAddress addr = (InetSocketAddress) responsePacket
.getSocketAddress();
votes.put(addr, vote);
(4) 所有server遍历请求一次后,选举线程进行投票记录计算:countVote()。每次计算需要统计两个值:result.vote保存zxid最大的投票记录 ,result.count是对应得票数 ;result.winner记录得票数最多的投票记录 ,result.winningCount是对应得票数 。
result.vote = new Vote(Long.MIN_VALUE, Long.MIN_VALUE);
result.winner = new Vote(Long.MIN_VALUE, Long.MIN_VALUE);
Collection<Vote> votesCast = votes.values();
HashMap<Vote, Integer> countTable = new HashMap<Vote, Integer>();
// Now do the tally
for (Vote v : votesCast){
Integer count = countTable.get(v);
if (count == null) {
count = Integer.valueOf(0);
}
countTable.put(v, count + 1);
……
}
result.winningCount = 0;
LOG.info("Election tally: ");
for (Entry<Vote, Integer> entry : countTable.entrySet()) {
if (entry.getValue() > result.winningCount) {
result.winningCount = entry.getValue();
result.winner = entry.getKey();
}
LOG.info(entry.getKey().id + "\t-> " + entry.getValue());
}
return result;
(5) 对统计结果进行判断,如果result.winner对应的winningCount> n/2+1,即获得了n/2+1的投票数,表明该备选leader获胜,将根据获胜leader的相关信息设置自己的状态;否则将currentVote设置为result.vote,即选择zxid最大的那个server作为备选leader,然后重复整个过程,直到leader被选举出来。
if (votes.size() == 0) {
self.setCurrentVote(new Vote(self.getId(), self.getLastLoggedZxid()));
} else {
if (result.winner.id >= 0) {
self.setCurrentVote(result.vote);
// To do: this doesn't use a quorum verifier
if (result.winningCount > (self.getVotingView().size() / 2)) {
self.setCurrentVote(result.winner);
s.close();
Vote current = self.getCurrentVote();
LOG.info("Found leader: my type is: " + self.getLearnerType());
/* * We want to make sure we implement the state machine
* correctly. If we are a PARTICIPANT, once a leader
* is elected we can move either to LEADING or
* FOLLOWING. However if we are an OBSERVER, it is an
* error to be elected as a Leader.
*/
if (self.getLearnerType() == LearnerType.OBSERVER) {
if (current.id == self.getId()) {
// This should never happen!
LOG.error("OBSERVER elected as leader!");
Thread.sleep(100);
} else {
self.setPeerState(ServerState.OBSERVING);
Thread.sleep(100);
return current;
}
} else {
self.setPeerState((current.id == self.getId())
? ServerState.LEADING: ServerState.FOLLOWING);
if (self.getPeerState() == ServerState.FOLLOWING) {
Thread.sleep(100);
}
return current;
}
}
}
}
另外,从流程中可以分析得知:要使leader获得多数server的支持,server的总数必须是2n+1的,且存活的server数目不能少于n+1,否则选主流程无法成功,server将会一直处于该LOOKING状态。
如果某server成为leader,那么一定拥有最大的zxid,也就是数据最新。此时会触发leader向其他follower状态同步的过程,保证所有server的数据一致。该恢复过程完毕后,才能重新接收client的请求。
observer节点同样参与发送request和接收response的过程,但是ResponderThread的run()方法中并不做任何处理。也就是说,observer节点需要确定leader节点,但不参与选举。
1.2 FashLeaderElection算法
流程简述如下:
(1) 选举线程将epoch+1,向所有server提议自己变成leader,广播自身(id,zxid)信息。
(2) 其他server收到提议后,需要解决epoch和zxid的冲突,然后向发起提议的server回复信息。(Q1:如何解决冲突?回复的是什么?)
(3) 选举线程接收到其他server的回复,先判断自身epoch是否合法,然后更新epoch为对方推荐的zxid、epoch,并向对方发送ACK消息,然后继续等待。等待一个周期结束后,如果仍未能选出leader,则再次发送自己推荐的leader信息;否则可以结束。(Q2:过程不明?)
(4) 重复该过程,最后一定能选举出leader。(Q3:为什么?)
二、状态同步
状态同步实际上就是选举完成后,leader向follower同步数据的恢复过程,具体的流程分析可以按照不同角色进行阐述。
2.1 LEADER流程
(1) leader设置最新的epoch,即zxid的高32位++;
(2) leader构建NEWLEADER包,该包的数据是当前最大的事务id,然后广播给所有的follower告知 leader保存的事务id是多少,从而判断是否需要同步数据;
(3) leader会给每个follower创建一个线程LearnerHandler,负责接收它们的同步数据请求;
(4) leader主线程阻塞等待其他follower的回应,只有在超过半数的follower已经同步数据完毕,该过程才能结束,leader才能正式成为leader。
2.2 FOLLOWER流程
follower的同步逻辑,主要是followLeader()方法:
(1)findLeader()方法返回leader地址,然后connectToLeader()连接到leader,有重试机制,超时未连接上则退回到LOOKING状态;
(2)生成的leaderIs、leaderOs就是对应的Archive。
(3) registerLeader()方法向leader注册,需要传入pktType和sentLastZxid参数,前者是packet类型,后者是该follower最新的zxid;
(4)方法返回leader反馈的newLeaderZxid,同时leader开始向follower发送需要同步的信息,同步哪些信息是根据sentLastZxid和newLeaderZxid来确定的。
(5) 进行syncWithLeader()过程,与leader数据同步。该过程会循环收到不同类型的packet,只有当收到Leader.UPTODATE类型的packet时,才表示同步结束。
(6)循环执行processPacket(),直到服务停止为止。processPacket()主要是根据leader发送过来的packet类型进行不同处理。
① Leader.PING:执行ping()。
② Leader.SYNC:执行FollowerZookeeperServer的sync()。
③ Leader.REVALIDATE:执行revalidate()。
④ Leader.UPTODATE:不可能,报错。
⑤ Leader.COMMIT:执行FollowerZookeeperServer的commit()。
⑥ Leader.PROPOSAL:执行FollowerZookeeperServer的logRequest()。
2.3 OBSERVER流程
Observer的逻辑所在,主要是observeLeader()方法:
① 与Follower同。
② 与Follower同,不过传入的pktType不同。
③ 与Follower同。
④ processPacket()方法的处理逻辑与Follower不同,主要是:Leader.PROPOSAL、Leader.COMMIT被忽略了。
三、广播流程
经典的两阶段提交,即leader提起一个决议,由followers进行投票,leader对投票结果进行计算决定是否通过该决议,如果通过执行该决议(事务),否则什么也不做。
运维网声明
1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网 享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com