设为首页 收藏本站
查看: 823|回复: 0

[经验分享] zookeeper 选举过程

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-9-6 11:39:07 | 显示全部楼层 |阅读模式
synchronized public void startLeaderElection() {
try {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
for (QuorumServer p : getView().values()) {
if (p.id == myid) {
myQuorumAddr = p.addr;
break;
}
}
if (myQuorumAddr == null) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
this.electionAlg = createElectionAlgorithm(electionType);
}
  有 几种类型electionType 选举 算法,默认electionType=3,即快速选举fast paxos算法



    protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = new QuorumCnxManager(this);
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
  electionType=0 就是普通的paxos选举,该选举算法采用udp,开辟了一个ResponsderThread线程:



  @Deprecated
class ResponderThread extends Thread {
ResponderThread() {
super("ResponderThread");
}
volatile boolean running = true;
@Override
public void run() {
try {
byte b[] = new byte[36];
ByteBuffer responseBuffer = ByteBuffer.wrap(b);
DatagramPacket packet = new DatagramPacket(b, b.length);
while (running) {
udpSocket.receive(packet);
if (packet.getLength() != 4) {
LOG.warn("Got more than just an xid! Len = "
+ packet.getLength());
} else {
responseBuffer.clear();
responseBuffer.getInt(); // Skip the xid
                        responseBuffer.putLong(myid);
Vote current = getCurrentVote();
switch (getPeerState()) {
case LOOKING:
responseBuffer.putLong(current.getId());
responseBuffer.putLong(current.getZxid());
break;
case LEADING:
responseBuffer.putLong(myid);
try {
long proposed;
synchronized(leader) {
proposed = leader.lastProposed;
}
responseBuffer.putLong(proposed);
} catch (NullPointerException npe) {
// This can happen in state transitions,
// just ignore the request
                            }
break;
case FOLLOWING:
responseBuffer.putLong(current.getId());
try {
responseBuffer.putLong(follower.getZxid());
} catch (NullPointerException npe) {
// This can happen in state transitions,
// just ignore the request
                            }
break;
case OBSERVING:
// Do nothing, Observers keep themselves to
// themselves.
break;
}
packet.setData(b);
udpSocket.send(packet);
}
packet.setLength(b.length);
}
} catch (RuntimeException e) {
LOG.warn("Unexpected runtime exception in ResponderThread",e);
} catch (IOException e) {
LOG.warn("Unexpected IO exception in ResponderThread",e);
} finally {
LOG.warn("QuorumPeer responder thread exited");
}
}
}
  可以看到@Deprecated,所以说LeaderElection 方式将会被弃用,只采用fast paxos方法。
  responsder线程会阻塞在receive处,等待其他server投回的结果,我们同时看看QuorumPeer线程的逻辑:



public void run() {
setName("QuorumPeer" + "[myid=" + getId() + "]" +
cnxnFactory.getLocalAddress());
LOG.debug("Starting quorum peer");
try {
jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for(QuorumServer s: getView().values()){
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else {
p = new RemotePeerBean(s);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxQuorumBean = null;
}
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
logFactory, this,
new ZooKeeperServer.BasicDataTreeBuilder(),
this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
                            roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );                        
} finally {
observer.shutdown();
setObserver(null);
setPeerState(ServerState.LOOKING);
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
}
}
} finally {
LOG.warn("QuorumPeer main thread exited");
try {
MBeanRegistry.getInstance().unregisterAll();
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
jmxQuorumBean = null;
jmxLocalPeerBean = null;
}
}
  QuorumPeer线程 发现自己处于LOOKING状态,就向所有server包括自己发送投票(addr -> electionAddr 发。 反向回投票结果),内容是xid 一个随机的4字节整数, 然后等待结果。 所有server包括自身的responder线程这时候会收到这个xid,并回送这个xid(用于给QuornumPeer线程辨别是否自己发出的),以及myid,currentId, currentZxid,(启动过程currentId,其实就是自身myid,即首先推举自己)。
  然后统计countVotes所有投票结果: 首先过滤无效的投票,即要求被投的id,也在heardFrom里(被投的server,自己也参与了投票,否则算没被投);找出被个被投的id所拥有的最大zxid;投给同一个id的投票中,如果某投票的zxid小于那个zxid(投同一个id的投票中最大的zxid),则zxid设成那个最大的(Make all zxids for a given vote id equal to the largest zxid seen for that id); 统计出投各个id的票数,以及拥有最大zxid的被投id的票数; 返回的result,vote和count成员是指向最大zxid的投票,而winner和winningcount指向的是投票数最多的那个; 这就是countVotes函数。
  countVotes函数返回后,如果winningCout 过半,则设置currentVote为winner(并设置自己为leading或者following等状态),否则设置为vote(最大zxid那个),继续投。
  
  
  *****************************************************************
  
  对于fast paxos 算法: FastLeaderElection 与QuorumCnxManager
  QuorumCnxManager 类里 有个Listener 线程,采用TCP的,而不再是udp方式的。 监听eletionAddr 端口。
  FastLeaderElection 类里也存在一个类似于LeaderElection类里的lookForLeader方法(同样在QuornumPeer线程里被调用):
  
  FastLeaderElection 类里 有sendqueue 和 recvqueue; Messenger类对象(由QuorumCnxManager 对象构造),该类对象构造的时候就开了两个线程:
  WorkerSender 和 WorkerReceiver。
  WorkSender线程 从 sendqueue.poll 等发送内容,调用QuorumCnxManager 对象的toSend方法;
  WorkerReceiver线程调用QuorumCnxManager 对象的pollRecvQueue方法。。。 总之多个线程又是同名的用于多处,复杂!垃圾代码。
  
  首先 updateProposal:
  proposedLeader = leader; // self.id
        proposedZxid = zxid;      // zxid
        proposedEpoch = epoch;  // epoch
  
  大概比较算法是这样:
  1. 先比较epoch,谁大认谁
  2. 比较zxid,谁大认谁
  3. 比较serverid,谁大认谁
  另外有个logicalclock表示某轮选举,好比十七大,十八大。
  在集群刚启动是前两条都是一样大,通过第三点再定夺(在机器世界里,有些机器也是天生的不一样,没法比),当然最后还得超过半数认同才行,虽然你天生地有被提名权,但也得经过人大走个过场是不是。
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-110092-1-1.html 上篇帖子: zookeeper 实现分布式锁 下篇帖子: zookeeper watches
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表