设为首页 收藏本站
查看: 902|回复: 0

[经验分享] zookeeper代码阅读

[复制链接]

尚未签到

发表于 2017-4-18 13:21:00 | 显示全部楼层 |阅读模式
开始读zookeeper代码,首先启动zookeeper,看到

java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /home/uniseraph/dev/zookeeper-3.3.3/bin/../build/classes:/home/uniseraph/dev/zookeeper-3.3.3/bin/../build/lib/log4j-1.2.15.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../build/lib/jline-0.9.94.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../zookeeper-3.3.3.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../lib/log4j-1.2.15.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../lib/jline-0.9.94.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../src/java/lib/ivy-2.1.0.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../src/java/lib/ant-eclipse-1.0-jvm1.2.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../conf: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /home/uniseraph/dev/zookeeper-3.3.3/bin/../conf/zoo.cfg

确认入口函数在QuorumPeerMain。
1. 去掉一些注释,找到关键初始化点
  
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
main.initializeAndRun(args);
}


2. 解析输入参数,选择cluster/standalone
protected void initializeAndRun(String[] args)
throws ConfigException, IOException
{
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
if (args.length == 1 && config.servers.size() > 0) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}

3  runFromConfig读取配置文件,进行系统初始化
3.1 注册一个MBean
       ManagedUtil.registerLog4jMBeans();

3.2 初始化一个连接工厂,这里将是关键,为nio socket server进行一些初始化动作
NIOServerCnxn.Factory cnxnFactory =
new NIOServerCnxn.Factory(config.getClientPortAddress(),
config.getMaxClientCnxns());

     
public Factory(InetSocketAddress addr, int maxcc) throws IOException {
super("NIOServerCxn.Factory:" + addr);
setDaemon(true);
maxClientCnxns = maxcc;
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
ss.register(selector, SelectionKey.OP_ACCEPT);
}


3.3 初始化一个QuorumPeer,将配置文件中参数赋值给它,并启动之。QuorumPeer负责处理quorum protol,选出leader。

quorumPeer = new QuorumPeer();
quorumPeer.setClientPortAddress(config.getClientPortAddress());
quorumPeer.setTxnFactory(new FileTxnSnapLog(
new File(config.getDataLogDir()),
new File(config.getDataDir())));
quorumPeer.setQuorumPeers(config.getServers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.start();


  
public synchronized void start() {
try {
zkDb.loadDataBase();
} catch(IOException ie) {
LOG.fatal("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
cnxnFactory.start();        
startLeaderElection();
super.start();
}


3.3.1 quorumPeer.start启动时候先加载硬盘上的zk数据;

try {
zkDb.loadDataBase();
} catch(IOException ie) {
LOG.fatal("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}


3.3.2 启动factory的nio 监听线程,开始循环

cnxnFactory.start();  


因为Factory继承自Thread,所以新起一个线程执行。

public void run() {
while (!ss.socket().isClosed()) {
try {
selector.select(1000);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
selected);
Collections.shuffle(selectedList);
for (SelectionKey k : selectedList) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k
.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
LOG.warn("Too many connections from " + ia
+ " - max is " + maxClientCnxns );
sc.close();
} else {
LOG.info("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector,
SelectionKey.OP_READ);
NIOServerCnxn cnxn = createConnection(sc, sk);
sk.attach(cnxn);
addCnxn(cnxn);
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select "
+ k.readyOps());
}
}
}
selected.clear();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring exception", e);
}
}
clear();
LOG.info("NIOServerCnxn factory exited run method");
}


3.3.2.1 首先在一个无限循环中,nio selector进行监听,每1秒或者有数据来就唤醒一次。

selector.select(1000);  
Set<SelectionKey> selected;  
synchronized (this) {  
selected = selector.selectedKeys();  
}  
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(  
selected);  
Collections.shuffle(selectedList);


3.3.2.2 如果是有链接请求来了,则accept之,并创建链接上下文,且在selector中注册SelectionKey.OP_READ事件

if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k
.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
LOG.warn("Too many connections from " + ia
+ " - max is " + maxClientCnxns );
sc.close();
} else {
LOG.info("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector,
SelectionKey.OP_READ);
NIOServerCnxn cnxn = createConnection(sc, sk);
sk.attach(cnxn);
addCnxn(cnxn);
}
}


3.3.2.3 如果是READ/WRITE事件,处理之
else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
}


NIOServerCnxn处理另外分析;
3.3 启动select leader算法
startLeaderElection();

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-365990-1-1.html 上篇帖子: 【转】 ZooKeeper安装过程 下篇帖子: 【ZooKeeper Notes】系列文章
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表