设为首页 收藏本站
查看: 919|回复: 0

[经验分享] ZooKeeper之FastLeaderElection算法详解

[复制链接]

尚未签到

发表于 2015-9-6 13:32:27 | 显示全部楼层 |阅读模式
  当我们把zookeeper服务启动时,首先需要做的一件事就是leader选举,zookeeper中leader选举的算法有3种,包括LeaderElection算法、AuthFastLeaderElection算法以及FastLeaderElection算法,其中FastLeadElection算法是默认的,当然,我们也可以在配置文件中修改配置项:electionAlg。
  1、当zookeeper服务启动时,在类QuorumPeerMain中的入口函数main,主线程启动:
  

public class QuorumPeerMain {
private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMain.class);
private static final String USAGE = "Usage: QuorumPeerMain configfile";
protected QuorumPeer quorumPeer;
/**
* To start the replicated server specify the configuration file name on
* the command line.
* @param args path to the configfile
*/
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
  
  2、然后便是QuorumPeer重写Thread.start方法,启动:
  

          quorumPeer.start();
quorumPeer.join();
  
  在类QuorumPeer中

   @Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
loadDataBase();
cnxnFactory.start();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
startLeaderElection();
super.start();
}3、可以从上面的源码中看到,quorumPeer线程启动后,首先做的是数据恢复,它会读取保存在磁盘中的数据:  

private void loadDataBase() {
try {
//从本地文件中恢复db
zkDb.loadDataBase();
// load the epochs
/*
从最新的zxid恢复epoch变量
其中zxid为long型,前32位代表epoch值,后32位代表zxid值,
这个zxid(ZooKeeper Transaction Id),即事务id,zookeeper每次更,zxid都会增大
因此越大代表数据越新
*/
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZxid;
//....

4、然后便是初始化选举,一开始选举自己,默认使用的算法是FastLeaderElection:  
  

synchronized public void startLeaderElection() {
try {
/*
先投自己
*/
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
// if (!getView().containsKey(myid)) {
//      throw new RuntimeException("My id " + myid + " not in the peer list");
//}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
this.electionAlg = createElectionAlgorithm(electionType);
}
5、然后便是绑定选举端口,FastLeaderElection初始化:  
  

protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = new QuorumCnxManager(this);
/*
绑定选举端口,等待集群其它机器连接
*/
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
//基于TCP的选举算法
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}

6、QuorumPeer线程启动:  
  

private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
/*
业务层发送队列,业务对象ToSend
业务层接收队列,业务对象Notification
*/
sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}在FastLeaderElection.java文件中:

Messenger(QuorumCnxManager manager) {
this.ws = new WorkerSender(manager);
this.wsThread = new Thread(this.ws,
&quot;WorkerSender[myid=&quot; + self.getId() + &quot;]&quot;);
this.wsThread.setDaemon(true);
this.wr = new WorkerReceiver(manager);
this.wrThread = new Thread(this.wr,
&quot;WorkerReceiver[myid=&quot; + self.getId() + &quot;]&quot;);
this.wrThread.setDaemon(true);
}7、在进行选举的过程中,每台zookeeper server服务器有以下四种状态:LOOKING、FOLLOWING、LEADING、OBSERVING,其中出于OBSERVING状态的server不参加投票过程,只有出于LOOKING状态的机子才参加投票过程,一旦投票结束,server的状态就会变成FOLLOWER或者LEADER。  
  下面先说一下leader选举过程:
  步骤1:对于处于LOOKING状态的server来说,首先判断一个被称为逻辑时钟&#20540;(logicalclock),如果收到的logicalclock的&#20540;大于当前server自身的logicalclock&#20540;,说明这是更新的一次选举,此时需要更新自身server的logicalclock&#20540;,并且将之前收到的来自其他server的投票结果清空,然后判断是否需要更新自身的投票,判断的标准是先看epoch&#20540;的大小,然后再判断zxid的大小,最后再看server id的大小(当然,针对这种情况,server肯定会更新自身的投票,因为当前server的epoch&#20540;小于收到的epoch&#20540;嘛),然后将自身的投票广播给其他server。
  在FastLeaderElection.java文件中:
  

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug(&quot;id: &quot; + newId + &quot;, proposed id: &quot; + curId + &quot;, zxid: 0x&quot; +
Long.toHexString(newZxid) + &quot;, proposed zxid: 0x&quot; + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
*  as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
步骤2:如果是自身的logicalclock&#20540;大于接收的logicalclock&#20540;,那么就直接break;如果刚好相等, 就根据epoch、zxid以及server id来判断是否需要更新,然后再把自己的投票广播给其他server,最后要把收到投票加入到当前server接收的投票队伍中。  
  

HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

  
  在FastLeaderElection.java文件的lookForLeader函数中:
  

case LOOKING:
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
//清空之前收到的投票结果
recvset.clear();
//判断是否需要更新自身投票
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug(
&quot;Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x&quot;
+ Long.toHexString(n.electionEpoch)
+ &quot;, logicalclock=0x&quot; + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
//广播
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug(&quot;Adding vote: from=&quot; + n.sid +
&quot;, proposed leader=&quot; + n.leader +
&quot;, proposed zxid=0x&quot; + Long.toHexString(n.zxid) +
&quot;, proposed election epoch=0x&quot; + Long.toHexString(n.electionEpoch));
}
//加入投票队伍
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

步骤3:服务器判断投票是否结束,结束的条件是:是否某个leader得到了半数以上的server的支持,如果是,则尝试再等一会儿(200ms)看是否收到更新数据,如果没有收到,则设置自身的角色(follower Or leader),然后退出选举流程,否则继续。  
  FastLeaderElection.java文件中;
  

//判断投票是否结束
private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
/*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
}
return voteSet.hasAllQuorums();
}

在lookForLeader函数中:  
  

//判读投票是否结束
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
//再等一会儿,看是否有新的投票
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
//如果没有发生新的投票,则结束选举过程
//设置自身状态
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}

步骤4:以上我们讨论的是数据发送server的状态是LOOKING状态,如果数据发送方的状态是FOLLOWING或是LEADING状态,那么如果logicalclock相同,则将数据保存到recvset中,如果对方server自称是leader的话,那么就判断是否有半数以上的server支持它,如果是,则设置自身选举状态并且退出选举;  
  

case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
//当前server与发送方server的logicalclock相同
if(n.electionEpoch == logicalclock.get()){
//加入到recvset中
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}

步骤5:如果收到的数据的logicalclock&#20540;与当前server的logicalclock不相等,那么说明在另外一个选举中已经有了选举结果,于是加入outofelection集合中,并且在outofelection集合中判断时候支持过半,如果是,则更新自身的投票,并且设置自身的状态:  
  

outofelection.put(n.sid, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}

总结:这就是zookeeper的FastLeaderElection选举的大致过程。  
  参考博客:
  http://blog.iyunv.com/xhh198781/article/details/6619203
  http://iwinit.iteye.com/blog/1773531

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-110176-1-1.html 上篇帖子: zookeeper in mesos 下篇帖子: ZooKeeper 配置参数说明
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表