设为首页 收藏本站
查看: 1267|回复: 0

[经验分享] ZooKeeper源码(二)---ZooKeeper流程

[复制链接]

尚未签到

发表于 2015-9-6 09:49:27 | 显示全部楼层 |阅读模式
一、选主流程
  zookeeper核心机制包括:恢复模式(选主流程)和广播模式(同步流程)。当服务刚启动、leader崩溃、follower不足半数时,系统就进入选主流程,此时不对外提供服务;当leader被选举出来后,系统就进入同步流程,server之间完成状态同步,此后对外提供服务。
  选举策略主要基于paxos算法,一种称为LeaderElection算法,一种称为FashLeaderElection算法,系统默认使用FashLeaderElection算法完成。
1.1 LeaderElection算法
  以下源码都在Leader类里
  流程简述如下:
  (1)进入选主流程,当前发起选举的线程(选举线程)负责对投票结果进行统计,以及决定推荐哪个server作为leader;
  (2)选举线程初始化currentVote,备选leader id默认是自己,然后向所有server发起一次询问,包括自己;Vote类实际上可看成(id, zxid)这样的二元组,其中id表示备 选leader的id,zxid表示当前最新事务id。


  • protected QuorumPeer self;

  •    self.setCurrentVote(new Vote(self.getId(),self.getLastLoggedZxid()));

  • ……

  •    int xid = epochGen.nextInt();

  •    while (self.isRunning()) {

  •    HashMap<InetSocketAddress, Vote> votes =new HashMap<InetSocketAddress, Vote>(self.getVotingView().size());
  (3) 选举线程收到回复后,验证是否自己发起的询问,然后获取以下信息:
   第一,获取回复server的id,存储到询问对象列表heardFrom中;
  第二,获取回复server提议的备选leader信息(id,zxid),其中id是备选leader的id,zxid是事务id号,存储到投票记录表votes中。


  • s.send(requestPacket);

  • responsePacket.setLength(responseBytes.length);

  • s.receive(responsePacket);

  • ……

  • int recvedXid = responseBuffer.getInt();

  • ……

  • long peerId = responseBuffer.getLong();

  • heardFrom.add(peerId);

  • Vote vote = new Vote(responseBuffer.getLong(),

  • responseBuffer.getLong());

  • InetSocketAddress addr = (InetSocketAddress) responsePacket

  • .getSocketAddress();

  • votes.put(addr, vote);
  (4)所有server遍历请求一次后,选举线程进行投票记录计算:countVote()。每次计算需要统计两个值:result.vote保存zxid最大的投票记录,result.count是对应得票数;result.winner记录得票数最多的投票记录,result.winningCount是对应得票数


  • result.vote = new Vote(Long.MIN_VALUE, Long.MIN_VALUE);

  • result.winner = new Vote(Long.MIN_VALUE, Long.MIN_VALUE);

  • Collection<Vote> votesCast = votes.values();

  • HashMap<Vote, Integer> countTable = new HashMap<Vote, Integer>();

  • // Now do the tally

  • for (Vote v : votesCast){

  •    Integer count = countTable.get(v);

  •    if (count == null) {

  •    count = Integer.valueOf(0);

  •    }

  •    countTable.put(v, count + 1);

  •    ……

  • }

  • result.winningCount = 0;

  • LOG.info("Election tally: ");

  • for (Entry<Vote, Integer> entry : countTable.entrySet()) {

  •    if (entry.getValue() > result.winningCount) {

  •       result.winningCount = entry.getValue();

  •       result.winner = entry.getKey();

  •    }

  •    LOG.info(entry.getKey().id + "\t-> " + entry.getValue());

  • }

  • return result;
  (5)对统计结果进行判断,如果result.winner对应的winningCount> n/2+1,即获得了n/2+1的投票数,表明该备选leader获胜,将根据获胜leader的相关信息设置自己的状态;否则将currentVote设置为result.vote,即选择zxid最大的那个server作为备选leader,然后重复整个过程,直到leader被选举出来。


  • if (votes.size() == 0) {

  •    self.setCurrentVote(new Vote(self.getId(), self.getLastLoggedZxid()));

  •    } else {

  •       if (result.winner.id >= 0) {

  •          self.setCurrentVote(result.vote);

  •          // To do: this doesn't use a quorum verifier

  •          if (result.winningCount > (self.getVotingView().size() / 2)) {

  •             self.setCurrentVote(result.winner);

  •             s.close();

  •             Vote current = self.getCurrentVote();

  •             LOG.info("Found leader: my type is: " + self.getLearnerType());

  •             /* * We want to make sure we implement the state machine

  •              * correctly. If we are a PARTICIPANT, once a leader

  •              * is elected we can move either to LEADING or

  •              * FOLLOWING. However if we are an OBSERVER, it is an

  •              * error to be elected as a Leader.

  •              */

  •             if (self.getLearnerType() == LearnerType.OBSERVER) {

  •                if (current.id == self.getId()) {

  •                   // This should never happen!

  •                   LOG.error("OBSERVER elected as leader!");

  •                   Thread.sleep(100);

  •                } else {

  •                   self.setPeerState(ServerState.OBSERVING);

  •                   Thread.sleep(100);

  •                   return current;

  •                }

  •             } else {

  •                self.setPeerState((current.id == self.getId())

  •                      ? ServerState.LEADING: ServerState.FOLLOWING);

  •                if (self.getPeerState() == ServerState.FOLLOWING) {

  •                Thread.sleep(100);

  •                }

  •                return current;

  •             }

  •          }

  •       }

  •    }
  另外,从流程中可以分析得知:要使leader获得多数server的支持,server的总数必须是2n+1的,且存活的server数目不能少于n+1,否则选主流程无法成功,server将会一直处于该LOOKING状态。
  如果某server成为leader,那么一定拥有最大的zxid,也就是数据最新。此时会触发leader向其他follower状态同步的过程,保证所有server的数据一致。该恢复过程完毕后,才能重新接收client的请求。
  observer节点同样参与发送request和接收response的过程,但是ResponderThread的run()方法中并不做任何处理。也就是说,observer节点需要确定leader节点,但不参与选举。
1.2 FashLeaderElection算法
  流程简述如下:
  (1)选举线程将epoch+1,向所有server提议自己变成leader,广播自身(id,zxid)信息。
  (2)其他server收到提议后,需要解决epoch和zxid的冲突,然后向发起提议的server回复信息。(Q1:如何解决冲突?回复的是什么?)
  (3) 选举线程接收到其他server的回复,先判断自身epoch是否合法,然后更新epoch为对方推荐的zxid、epoch,并向对方发送ACK消息,然后继续等待。等待一个周期结束后,如果仍未能选出leader,则再次发送自己推荐的leader信息;否则可以结束。(Q2:过程不明?)
  (4) 重复该过程,最后一定能选举出leader。(Q3:为什么?)
二、状态同步
  状态同步实际上就是选举完成后,leader向follower同步数据的恢复过程,具体的流程分析可以按照不同角色进行阐述。
2.1 LEADER流程
  (1) leader设置最新的epoch,即zxid的高32位++;
  (2) leader构建NEWLEADER包,该包的数据是当前最大的事务id,然后广播给所有的follower告知 leader保存的事务id是多少,从而判断是否需要同步数据;
  (3) leader会给每个follower创建一个线程LearnerHandler,负责接收它们的同步数据请求;
  (4) leader主线程阻塞等待其他follower的回应,只有在超过半数的follower已经同步数据完毕,该过程才能结束,leader才能正式成为leader。
2.2 FOLLOWER流程
  follower的同步逻辑,主要是followLeader()方法:
  (1)findLeader()方法返回leader地址,然后connectToLeader()连接到leader,有重试机制,超时未连接上则退回到LOOKING状态;
  (2)生成的leaderIs、leaderOs就是对应的Archive。
  (3) registerLeader()方法向leader注册,需要传入pktType和sentLastZxid参数,前者是packet类型,后者是该follower最新的zxid;
  (4)方法返回leader反馈的newLeaderZxid,同时leader开始向follower发送需要同步的信息,同步哪些信息是根据sentLastZxid和newLeaderZxid来确定的。
  (5) 进行syncWithLeader()过程,与leader数据同步。该过程会循环收到不同类型的packet,只有当收到Leader.UPTODATE类型的packet时,才表示同步结束。
  (6)循环执行processPacket(),直到服务停止为止。processPacket()主要是根据leader发送过来的packet类型进行不同处理。
   Leader.PING:执行ping()。
   Leader.SYNC:执行FollowerZookeeperServer的sync()。
   Leader.REVALIDATE:执行revalidate()。
   Leader.UPTODATE:不可能,报错。
   Leader.COMMIT:执行FollowerZookeeperServer的commit()。
   Leader.PROPOSAL:执行FollowerZookeeperServer的logRequest()。
2.3 OBSERVER流程
  Observer的逻辑所在,主要是observeLeader()方法:
   与Follower同。
  与Follower同,不过传入的pktType不同。
   与Follower同。
   processPacket()方法的处理逻辑与Follower不同,主要是:Leader.PROPOSAL、Leader.COMMIT被忽略了。
三、广播流程
  经典的两阶段提交,即leader提起一个决议,由followers进行投票,leader对投票结果进行计算决定是否通过该决议,如果通过执行该决议(事务),否则什么也不做。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-110004-1-1.html 上篇帖子: [zz]ZooKeeper 典型的应用场景 下篇帖子: 基于zookeeper的远程方法调用(RMI)的实现
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表