设为首页 收藏本站
查看: 1108|回复: 0

[经验分享] 深入浅出Zookeeper之一Server启动

[复制链接]
累计签到:2 天
连续签到:1 天
发表于 2017-4-19 11:26:26 | 显示全部楼层 |阅读模式
  大名鼎鼎的Zookeeper是解决分布式问题的神器。小编最近简单阅读了代码,分享一下。有不对之处,还请大家指出。
  整篇文章将分多个系列完成,因为涉及点比较多,很难在一片文章内搞定。关于zookeeper的使用场景,大家参考http://rdc.taobao.com/team/jm/archives/1232。api使用参考官网手http://zookeeper.apache.org/doc/trunk/。这里以最新的zookeeper3.4.5为例。
  这个系列的第一篇来说说zookeeper server端的启动,以单机为例,分布式zookeeper将在后续专门分析。
  单机版启动类ZooKeeperServerMain

protected void initializeAndRun(String[] args)
throws ConfigException, IOException
{
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
//解析配置文件zoo.cfg
ServerConfig config = new ServerConfig();
if (args.length == 1) {
config.parse(args[0]);
} else {
config.parse(args);
}
//启动
runFromConfig(config);
}
  具体解析:

  public void parse(String path) throws ConfigException {
QuorumPeerConfig config = new QuorumPeerConfig();
config.parse(path);
// let qpconfig parse the file and then pull the stuff we are
// interested in
readFrom(config);
}
  启动

    public void runFromConfig(ServerConfig config) throws IOException {
LOG.info("Starting server");
try {
// Note that this thread isn't going to be doing anything else,
// so rather than spawning another thread, we will just call
// run() in this thread.
// create a file logger url from the command line args
ZooKeeperServer zkServer = new ZooKeeperServer();
//2个文件,log和data文件
FileTxnSnapLog ftxn = new FileTxnSnapLog(new
File(config.dataLogDir), new File(config.dataDir));
zkServer.setTxnLogFactory(ftxn);
zkServer.setTickTime(config.tickTime);
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
//连接工厂,默认NIOServerCnxnFactory
cnxnFactory = ServerCnxnFactory.createFactory();
//初始化主线程,打开selector,并bind端口,打开NIO的ACCEPT通知
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
//并生成最新的snapshot文件,启动IO主线程,从snapshot文件和log文件中恢复内存database结构和session结构
cnxnFactory.startup(zkServer);
//启动线程等待之前启动的主线程结束
cnxnFactory.join();
if (zkServer.isRunning()) {
zkServer.shutdown();
}
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Server interrupted", e);
}
}
  具体startup流程:

    public void startup(ZooKeeperServer zks) throws IOException,
InterruptedException {
//启动IO主线程
start();
//从log和snapshot回复database和session,并重新生成一个最新的snapshot文件
zks.startdata();
//启动sessionTracker线程,初始化IO请求的处理链,并启动每个processor线程
zks.startup();
setZooKeeperServer(zks);
}
  具体恢复过程:

    public void startdata()
throws IOException, InterruptedException {
//check to see if zkDb is not null
if (zkDb == null) {
//初始化database
zkDb = new ZKDatabase(this.txnLogFactory);
}  
if (!zkDb.isInitialized()) {
loadData();
}
}
  DataTree用Map实现,key是节点名称,value是DataNode,DataNode从有parent指向父亲节点,有children指向所有孩子节点

    public DataTree() {
/* Rather than fight it, let root have an alias */
//'/','/zookeeper','/zookeeper/quota'3个系统节点初始化
nodes.put("", root);
nodes.put(rootZookeeper, root);
/** add the proc node and quota node */
root.addChild(procChildZookeeper);
nodes.put(procZookeeper, procDataNode);
procDataNode.addChild(quotaChildZookeeper);
nodes.put(quotaZookeeper, quotaDataNode);
}
  具体恢复数据

    public void loadData() throws IOException, InterruptedException {
//执行恢复,并返回最新的事务ID
setZxid(zkDb.loadDataBase());
// Clean up dead sessions
//清理session
LinkedList<Long> deadSessions = new LinkedList<Long>();
for (Long session : zkDb.getSessions()) {
if (zkDb.getSessionWithTimeOuts().get(session) == null) {
deadSessions.add(session);
}
}
zkDb.setDataTreeInit(true);
for (long session : deadSessions) {
// XXX: Is lastProcessedZxid really the best thing to use?
killSession(session, zkDb.getDataTreeLastProcessedZxid());
}
//生成最新的snapshot文件
// Make a clean snapshot
takeSnapshot();
}
  load过程:

    public long loadDataBase() throws IOException {
//load过程中,发起分布式提议,对于单机版,先不考虑
PlayBackListener listener=new PlayBackListener(){
public void onTxnLoaded(TxnHeader hdr,Record txn){
Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
null, null);
r.txn = txn;
r.hdr = hdr;
r.zxid = hdr.getZxid();
addCommittedProposal(r);
}
};
//load数据
long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
initialized = true;

   restore过程:

    public long restore(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
//从FileSnap中恢复
snapLog.deserialize(dt, sessions);
FileTxnLog txnLog = new FileTxnLog(dataDir);
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
//从snapshot中记录的最新的事务开始处理,将log中的事务merge到datatree中
while (true) {
// iterator points to
// the first valid txn when initialized
hdr = itr.getHeader();
if (hdr == null) {
//empty logs
return dt.lastProcessedZxid;
}
if (hdr.getZxid() < highestZxid && highestZxid != 0) {
LOG.error(highestZxid + "(higestZxid) > "
+ hdr.getZxid() + "(next log) for type "
+ hdr.getType());
} else {
highestZxid = hdr.getZxid();
}
try {
processTransaction(hdr,dt,sessions, itr.getTxn());
} catch(KeeperException.NoNodeException e) {
throw new IOException("Failed to process transaction type: " +
hdr.getType() + " error: " + e.getMessage(), e);
}
listener.onTxnLoaded(hdr, itr.getTxn());
if (!itr.next())
break;
}
return highestZxid;
}
  FileSnap恢复过程:

    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException {
// we run through 100 snapshots (not all of them)
// if we cannot get it running within 100 snapshots
// we should  give up
//找前100个snapshot文件,降序,最新的文件在最前面
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
//从最新的文件开始恢复,如果反序列化ok而且checksum也ok,则恢复结束
File snap = null;
boolean foundValid = false;
for (int i = 0; i < snapList.size(); i++) {
snap = snapList.get(i);
InputStream snapIS = null;
CheckedInputStream crcIn = null;
try {
LOG.info("Reading snapshot " + snap);
snapIS = new BufferedInputStream(new FileInputStream(snap));
crcIn = new CheckedInputStream(snapIS, new Adler32());
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
deserialize(dt,sessions, ia);
long checkSum = crcIn.getChecksum().getValue();
long val = ia.readLong("val");
if (val != checkSum) {
throw new IOException("CRC corruption in snapshot :  " + snap);
}
foundValid = true;
break;
} catch(IOException e) {
LOG.warn("problem reading snap file " + snap, e);
} finally {
if (snapIS != null)
snapIS.close();
if (crcIn != null)
crcIn.close();
}
}
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
//snapshot文件名就记录着最新的zxid
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");
return dt.lastProcessedZxid;
}
  单个事务处理:

   public void processTransaction(TxnHeader hdr,DataTree dt,
Map<Long, Integer> sessions, Record txn)
throws KeeperException.NoNodeException {
ProcessTxnResult rc;
switch (hdr.getType()) {
//创建session
case OpCode.createSession:
sessions.put(hdr.getClientId(),
((CreateSessionTxn) txn).getTimeOut());
......
// give dataTree a chance to sync its lastProcessedZxid
rc = dt.processTxn(hdr, txn);
break;
case OpCode.closeSession:
sessions.remove(hdr.getClientId());
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
"playLog --- close session in log: 0x"
+ Long.toHexString(hdr.getClientId()));
}
rc = dt.processTxn(hdr, txn);
break;
default:
rc = dt.processTxn(hdr, txn);
}
......
}
  DataTree处理单个事务

public ProcessTxnResult processTxn(TxnHeader header, Record txn)
{
ProcessTxnResult rc = new ProcessTxnResult();
try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
switch (header.getType()) {
case OpCode.create:
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
createNode(
createTxn.getPath(),
createTxn.getData(),
createTxn.getAcl(),
createTxn.getEphemeral() ? header.getClientId() : 0,
createTxn.getParentCVersion(),
header.getZxid(), header.getTime());
break;
case OpCode.delete:
DeleteTxn deleteTxn = (DeleteTxn) txn;
rc.path = deleteTxn.getPath();
deleteNode(deleteTxn.getPath(), header.getZxid());
break;
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(setDataTxn.getPath(), setDataTxn
.getData(), setDataTxn.getVersion(), header
.getZxid(), header.getTime());
break;
》......
/*
* A snapshot might be in progress while we are modifying the data
* tree. If we set lastProcessedZxid prior to making corresponding
* change to the tree, then the zxid associated with the snapshot
* file will be ahead of its contents. Thus, while restoring from
* the snapshot, the restore method will not apply the transaction
* for zxid associated with the snapshot file, since the restore
* method assumes that transaction to be present in the snapshot.
*
* To avoid this, we first apply the transaction and then modify
* lastProcessedZxid.  During restore, we correctly handle the
* case where the snapshot contains data ahead of the zxid associated
* with the file.
*/
//处理完事务后,再修改最新Zxid,如果是先修改Zxid再处理事务,修改完Zxid后,正好异步线程flush datatree
//此时由于事务并没有被处理,导致snapshot中的zxid比content新,而restore的时候是从最新zxid+1开始恢复的,从而
//导致丢数据
if (rc.zxid > lastProcessedZxid) {
lastProcessedZxid = rc.zxid;
}
......
return rc;
}
  以上就完成了server的数据恢复过程,LSM的精华所在。
  接下来server启动sessionTracker线程和请求处理链

protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
  核心IO线程
  


public void run() {
while (!ss.socket().isClosed()) {
try {
//select过程
selector.select(1000);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
selected);
//打乱顺序
Collections.shuffle(selectedList);
for (SelectionKey k : selectedList) {
//新连接进来,accept之
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k
.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
//校验同个client连接数是否超过限制
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
LOG.warn("Too many connections from " + ia
+ " - max is " + maxClientCnxns );
sc.close();
} else {
LOG.info("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
//异步模式
sc.configureBlocking(false);
//监听read事件
SelectionKey sk = sc.register(selector,
SelectionKey.OP_READ);
//创建内部连接
NIOServerCnxn cnxn = createConnection(sc, sk);
sk.attach(cnxn);
//添加到连接表,方便后续统计
addCnxn(cnxn);
}
}
//如果是read和write事件,则处理之
else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select "
+ k.readyOps());
}
}
}
//准备下次IO
selected.clear();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring exception", e);
}
}
closeAll();
LOG.info("NIOServerCnxn factory exited run method");
}
  具体io处理过程,将在后续结合实例来讲解。
  至此server启动完成,就等待client去连接了。server启动核心功能就是从snapshot和log文件中恢复datatree,其核心就是zxid,典型的LSM应用。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366341-1-1.html 上篇帖子: zookeeper 启动报错 : KeeperErrorCode = NoNode for /XXX 下篇帖子: (转)Zookeeper全解析——Paxos作为灵魂
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表