设为首页 收藏本站
查看: 919|回复: 0

[经验分享] Zookeeper 源码分析-leader选举

[复制链接]

尚未签到

发表于 2017-4-19 10:26:50 | 显示全部楼层 |阅读模式
选举的算法可以参考:http://blog.csdn.net/xhh198781/article/details/10949697
 
假设配置中有两个server
server.1=localhost:2888:3888
server.2=localhost:2889:3888
 
由前文可以,zookeeper在选举leader之前会先调用下面的代码,首先设置currentVote为myid,即一开始会选举自己为leader。如果[size=1em]electionType = 0,myid=1, Responder线程将监听在2888这个UDP端口上,处理其他节点的请求。

  synchronized public void startLeaderElection() {
currentVote = new Vote(myid, getLastLoggedZxid());
for (QuorumServer p : getView().values()) {
if (p.id == myid) {
myQuorumAddr = p.addr;
break;
}
}
if (myQuorumAddr == null) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
this.electionAlg = createElectionAlgorithm(electionType);
}
 
 
LeaderElection
如果electionAlg=0,将使用LeaderElection算法。LeaderElection会调用lookForLeader方法,先对每个peer询问他们选举的leader,然后调用[size=1em]countVotes查看那个节点胜出,并将它设置成currentVote。如果超过半数的人选举这个节点,则选举成功。

  public Vote lookForLeader() throws InterruptedException {
self.setCurrentVote(new Vote(self.getId(),
self.getLastLoggedZxid()));
int xid = epochGen.nextInt();
while (self.isRunning()) {
requestBuffer.putInt(xid);
requestPacket.setLength(4);
HashSet<Long> heardFrom = new HashSet<Long>();
for (QuorumServer server : self.getVotingView().values()) {
LOG.info("Server address: " + server.addr);
requestPacket.setSocketAddress(server.addr);
s.send(requestPacket);
responsePacket.setLength(responseBytes.length);
s.receive(responsePacket);
long peerId = responseBuffer.getLong();
heardFrom.add(peerId);
Vote vote = new Vote(responseBuffer.getLong(),
responseBuffer.getLong());
InetSocketAddress addr =
(InetSocketAddress) responsePacket
.getSocketAddress();
votes.put(addr, vote);
}
ElectionResult result = countVotes(votes, heardFrom);
// ZOOKEEPER-569:
// If no votes are received for live peers, reset to voting
// for ourselves as otherwise we may hang on to a vote
// for a dead peer                 
if (votes.size() == 0) {                    
self.setCurrentVote(new Vote(self.getId(),
self.getLastLoggedZxid()));
} else {
if (result.winner.id >= 0) {
self.setCurrentVote(result.vote);
// To do: this doesn't use a quorum verifier
if (result.winningCount > (self.getVotingView().size() / 2)) {
self.setCurrentVote(result.winner);
self.setPeerState((current.id == self.getId())
? ServerState.LEADING: ServerState.FOLLOWING);
if (self.getPeerState() == ServerState.FOLLOWING) {
Thread.sleep(100);
}   
}
}
}
}
 

protected ElectionResult countVotes(HashMap<InetSocketAddress, Vote> votes, HashSet<Long> heardFrom) {
ElectionResult result = new ElectionResult();
// Initialize with null vote
result.vote = new Vote(Long.MIN_VALUE, Long.MIN_VALUE);
result.winner = new Vote(Long.MIN_VALUE, Long.MIN_VALUE);
Collection<Vote> votesCast = votes.values();
// First make the views consistent. Sometimes peers will have
// different zxids for a server depending on timing.
for (Iterator<Vote> i = votesCast.iterator(); i.hasNext();) {
Vote v = i.next();
if (!heardFrom.contains(v.id)) {
// Discard votes for machines that we didn't hear from
i.remove();
continue;
}
for (Vote w : votesCast) {
if (v.id == w.id) {
if (v.zxid < w.zxid) {
v.zxid = w.zxid;
}
}
}
}
HashMap<Vote, Integer> countTable = new HashMap<Vote, Integer>();
// Now do the tally,选出zxid最大,且id最大的vote,作为下一轮选举的对象
for (Vote v : votesCast) {
Integer count = countTable.get(v);
if (count == null) {
count = Integer.valueOf(0);
}
countTable.put(v, count + 1);
if (v.id == result.vote.id) {
result.count++;
} else if (v.zxid > result.vote.zxid
|| (v.zxid == result.vote.zxid && v.id > result.vote.id)) {
result.vote = v;
result.count = 1;
}
}
result.winningCount = 0;
LOG.info("Election tally: ");
//挑选出被选举最多的节点作为winner
for (Entry<Vote, Integer> entry : countTable.entrySet()) {
if (entry.getValue() > result.winningCount) {
result.winningCount = entry.getValue();
result.winner = entry.getKey();
}
LOG.info(entry.getKey().id + "\t-> " + entry.getValue());
}
return result;
}
 
      FastLeaderElection(另参考http://blog.csdn.net/xhh198781/article/details/6619203
  是标准的fast paxos的实现,它首先向所有Server提议自己要成为leader,当其它Server收到提议以后,解决 epoch 和 zxid 的冲突,并接受对方的提议,然后向对方发送接受提议完成的消息。
 
FastLeaderElection算法通过异步的通信方式来收集其它节点的选票,同时在分析选票时又根据投票者的当前状态来作不同的处理,以加快Leader的选举进程。
每个Server都一个接收线程池和一个发送线程池, 在没有发起选举时,这两个线程池处于阻塞状态,直到有消息到来时才解除阻塞并处理消息,同时每个Serve r都有一个选举线程(可以发起选举的线程担任)。
1). 主动发起选举端(选举线程)的处理
首先自己的 logicalclock加1,然后生成notification消息,并将消息放入发送队列中, 系统中配置有几个Server就生成几条消息,保证每个Server都能收到此消息,如果当前Server 的状态是LOOKING就一直循环检查接收队列是否有消息,如果有消息,根据消息中对方的状态进行相应的处理。
2).主动发送消息端(发送线程池)的处理
将要发送的消息由Notification消息转换成ToSend消息,然后发送对方,并等待对方的回复。
3). 被动接收消息端(接收线程池)的处理
将收到的消息转换成Notification消息放入接收队列中,如果对方Server的epoch小于logicalclock则向其发送一个消息(让其更新epoch);如果对方Server处于Looking状态,自己则处于Following或Leading状态,则也发送一个消息(当前Leader已产生,让其尽快收敛)。
DSC0000.jpg

/**
* Starts a new round of leader election. Whenever our QuorumPeer
* changes its state to LOOKING, this method is invoked, and it
* sends notifications to all other peers.
*/
public Vote lookForLeader() throws InterruptedException {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
//发送notification给每个节点,告述他们自己当前选举的人,默认一开始选举自己   
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
//当前节点启动了几个接收线程,用于接收其他节点选举的结果,并将选举的结果存到recvqueue中
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
switch (n.state) {
case LOOKING:
// If notification > current, replace and send messages out
if (n.epoch > logicalclock) {
logicalclock = n.epoch;
recvset.clear();
//Check if a pair (server id, zxid) succeeds our current vote.
if(totalOrderPredicate(n.leader, n.zxid,
getInitId(), getInitLastLoggedZxid()))
updateProposal(n.leader, n.zxid);
else
updateProposal(getInitId(),
getInitLastLoggedZxid());
sendNotifications();
} else if (n.epoch < logicalclock) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification epoch is smaller than logicalclock. n.epoch = " + n.epoch
+ ", Logical clock" + logicalclock);
}
break;
//如果n.zxid大于proposedZxid,或者n.zxid等于proposedZxid且leader id大于proposedLeader 的id, 那么选举n.leader
} else if (totalOrderPredicate(n.leader, n.zxid,
proposedLeader, proposedZxid)) {
LOG.info("Updating proposal");
updateProposal(n.leader, n.zxid);
sendNotifications();
}
/*
* Only proceed if the vote comes from a replica in the
* voting view.
*/
if(self.getVotingView().containsKey(n.sid)){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
//If have received from all nodes, then terminate
if ((self.getVotingView().size() == recvset.size()) &&
(self.getQuorumVerifier().getWeight(proposedLeader) != 0)){
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
leaveInstance();
return new Vote(proposedLeader, proposedZxid);
//如果满足结束的条件,将进入结束等待阶段
} else if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid,
proposedLeader, proposedZxid)){
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
if(LOG.isDebugEnabled()){
LOG.debug("About to leave FLE instance: Leader= "
+ proposedLeader + ", Zxid = " +
proposedZxid + ", My id = " + self.getId()
+ ", My state = " + self.getPeerState());
}
leaveInstance();
return new Vote(proposedLeader,
proposedZxid);
}
}
}
}
}
   

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366257-1-1.html 上篇帖子: zookeeper windows 入门安装和测试 下篇帖子: ZooKeeper编程向导——源自官方文档
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表