设为首页 收藏本站
查看: 865|回复: 0

[经验分享] zookeeper存储之实现分析

[复制链接]

尚未签到

发表于 2015-11-21 14:30:28 | 显示全部楼层 |阅读模式
  

  zookeeper 存储基本都是在SyncRequestProcessor 单个线程完成的
  

1) 初始化
  

1.1)DataTree初始化
public DataTree() {
/* Rather than fight it, let root have an alias */
nodes.put("", root);
nodes.put(rootZookeeper, root);
/** add the proc node and quota node */
root.addChild(procChildZookeeper);
nodes.put(procZookeeper, procDataNode);
procDataNode.addChild(quotaChildZookeeper);
nodes.put(quotaZookeeper, quotaDataNode);
}
  会有3个节点 /、/zookeeper、/zookeeper/quota 内容为空

  

  注意DataTree维护了两个数据结构

  •   一个是ConcurrentHashMap<String, DataNode> nodes,所有的节点都可以在这查到
  •   一个是DataNode,DataNode包含了Set<String> children,含有父子关系
  也就是说通过DataNode可以遍历到子node的路径(索引),然后通过索引去nodes查到node实例
  

1.2)加载snapshot 和 committedlog中的事务到内存树
  【详见ZKDatabase.loadDataBase--->FileTxnSnapLog.restore】

  •   首先会加载多个snapshot到内存数中
  怎么会有多个snapshot不理解?


  •   加载committedlog
  系统会产生多个committedlog,以事务的序号作为后缀名,比如1-50个事务放在log.1, 51-100放在log.51,......

             文件名中后缀序号就是文件中第一个序号

  

              由于snapshot和committedlog并非完全同步,通常情况下,committedlog信息会多于snapshot,比如snapshot记录到了第80条事务,
              但committedlog 可能记录了150条事务,因此在加载的时候,就应该联合snapshot和committedlog

  

              如何联合?举个例子,就拿上面的例子来说,假设共有150条事务,并产生了3个日志文件,log.1,log.51,log.100
              snapshot记录到了第80条,那么就还应该需要log.51和log.100来做合并,程序会从log.51中第81条事务开始加载
              【详见TxnIterator】

  

              
1.3)处理过期session
  【详见ZooKeeperServer.killSession-->zkDb.killSession】

  zkDb.killSession为什么没有产生binlog?
  

  

1.4 ) 产生快照见ZooKeeperServer.takeSnapshot
  

2)运行过程
  一个专门的线程SyncRequestProcessor不断处理储存请求【详见SyncRequestProcessor.run】
  

  以一个典型产生node的场景为例
zk.create(&quot;/root&quot;, &quot;mydata&quot;.getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);

  

2.1)获取请求并决定是否应该flush到磁盘

if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
  如果toFlush列表中没有需要flush的请求,那么就阻塞等待请求
  否则已非阻塞的方式从请求队列去请求,一旦没有请求就flush
  

  可见zookeeper选择在空闲的时候flush,这是flush的时机之一
  flush的具体功能【详见SyncRequestProcessor.flush】
  

  toFlush 见3.0)节

  

2.2)线程是否应该结束
if (si == requestOfDeath) {
break;
}
  如果请求是特殊的请求Request.requestOfDeath,那么线程结束,典型的队列哨兵模式
  

2.3)写log
  【详见FileTxnLog.append】
  将修改类的请求写入log,类似mysql的binlog
  

  如果写入到2.4) 否则到2.5

  

2.4)是否应该roll log
  如果是事务型的请求,那么zks.getZKDatabase().append(si)会返回true
  是否是事务型的请求?
if (zks.getZKDatabase().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn(&quot;Too busy to snap, skipping&quot;);
} else {
snapInProcess = new Thread(&quot;Snapshot Thread&quot;) {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn(&quot;Unexpected exception&quot;, e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
}
  每写入一条日志,logCount++,  加到一定数量,开始roll log,
  如果此时并没有在产生快照,为了不阻塞线程,会起一个临时线程产生快照,然后将logCount清0
  

  每次产生快照都会以snap.${Long.toHexString(dataTree.lastProcessedZxid)}来命名
  

2.5) 是否应该直接响应
else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
continue;
}
  查看此时toFlush(响应队列)是否为空,如果为空,说明近一段时间读多写少,直接就响应了

  

2.6)最后的处理
toFlush.add(si);
if (toFlush.size() > 1000) {
flush(toFlush);
}
  如果没有直接响应,那么就将请求加入toFlush(响应队列),如果此时toFlush队列长度超过1000,就flush了
  

  到此为止,toFlush(响应队列)看上去逻辑混乱,下面会专门讲,另外flush的用途也会接下来讲
  

  

3)FAQ
  

3.0)toFlush到底是什么?
  

  toFlush队列用于存储请求,可能是读也可能是写

  zookeeper专门使用了一个线程SyncRequestProcessor来处理请求,所以这个线程必须合理的工作,否则就会对整体的性能造成伤害

  如果都是读请求就没必要toFlush了,但如果是写请求,就必须把请求写入log,这个写入未必能保证真的同步到磁盘,但如果每次写请求都同步,
  性能会有问题,所以从程序的设计应该能看到作者应该是处于这个考虑选择了两个时机来做这件事情

  •   如果没有请求的时候(即较空闲的时候)
  •   toFlush队列到了一定数量(1000),就会批量同步
  可以看到的一些问题

  •   由于要选择合适的时机flush,客户端的响应会受到影响,为什么不考虑分离磁盘同步和响应客户端?为了更严谨?
  •   如果写多读少,写会干扰读,因为所有的写都会加入到toFlush队列,而如果toFlush队列不为空,读也会放进去,正如上面提到的,toFlush并不会立即响应

  

3.1 )flush干了什么?怎么实现的?
private void flush(LinkedList<Request> toFlush) throws IOException {
if (toFlush.isEmpty())
return;
zks.getZKDatabase().commit();
while (!toFlush.isEmpty()) {
Request i = toFlush.remove();
nextProcessor.processRequest(i);
}
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}

  •   同步磁盘zks.getZKDatabase().commit()
  •   调用nextProcessor的方法响应网络请求,
       通常情况下nextProcessorFinalRequestProcessor且与Flushable没有关系
       所以只需关注FinalRequestProcessor.processRequest方法,下面会提到
  

3.2)zks.getZKDatabase().commit() 干了什么?
  zks.getZKDatabase().commit()实际调用了FileTxnLog.commit,代码如下

public synchronized void commit() throws IOException {
if (logStream != null) {
logStream.flush();
}
for (FileOutputStream log : streamsToFlush) {
log.flush();
if (forceSync) {
long startSyncNS = System.nanoTime();
log.getChannel().force(false);
long syncElapsedMS =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) {
LOG.warn(&quot;fsync-ing the write ahead log in &quot;
+ Thread.currentThread().getName()
+ &quot; took &quot; + syncElapsedMS
+ &quot;ms which will adversely effect operation latency. &quot;
+ &quot;See the ZooKeeper troubleshooting guide&quot;);
}
}
}
while (streamsToFlush.size() > 1) {
streamsToFlush.removeFirst().close();
}
}
  


  •   将当前流logStream flush
  •   将之前append的流streamsToFlush逐个flush;forceSync默认为true,因此会调用log.getChannel().force同步到磁盘
  •   完成之后,会将streamsToFlush中的流删除
  上面的flush和getChannel().force的差别?
  一个是应用级一个是磁盘级,当调用flush会将应用数据缓冲区中的全部提交给磁盘驱动去调度,但此时也未必全部同步到磁盘
  磁盘写一般是异步的,所以后者会保证全部同步到磁盘,类似操作系统的API fsync
  

3)FinalRequestProcessor 如何实现的?
  

3.3.1)处理outstandingChanges
synchronized (zks.outstandingChanges) {
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.get(0).zxid <= request.zxid) {
ChangeRecord cr = zks.outstandingChanges.remove(0);
if (cr.zxid < request.zxid) {
LOG.warn(&quot;Zxid outstanding &quot;
+ cr.zxid
+ &quot; is less than current &quot; + request.zxid);
}
if (zks.outstandingChangesForPath.get(cr.path) == cr) {
zks.outstandingChangesForPath.remove(cr.path);
}
}
if (request.hdr != null) {
TxnHeader hdr = request.hdr;
Record txn = request.txn;
rc = zks.processTxn(hdr, txn);
}
// do not add non quorum packets to the queue.
if (Request.isQuorum(request.type)) {
zks.getZKDatabase().addCommittedProposal(request);
}
}

  •   outstandingChanges是一个事务型请求的冗余队列,一旦处理完相关的事物请求,需要将outstandingChanges的相关item删除
  •   处理事务型请求,主要是涉及datatree的写操作
  •   最后会将请求放入ZKDatabase的committedLog,便于集群中的其他机器快速同步
3.3.2)构造响应

  接下来开始一长段的switch/case根据请求类型进行处理,基本都会构造响应
  以一个事务型请求为例
case OpCode.setData: {
lastOp = &quot;SETD&quot;;
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
}
  再看看一个非事务型请求的例子
case OpCode.exists: {
lastOp = &quot;EXIS&quot;;
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
existsRequest);
String path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
Stat stat = zks.getZKDatabase().statNode(path, existsRequest
.getWatch() ? cnxn : null);
rsp = new ExistsResponse(stat);
break;
}
  由于在3.1中已完成对事务型请求的处理,所以本阶段中事务型请求无需再处理
  如果是非事务型需要处理,通常就是操作一下datatree获取数据
  

3.3.3)网络响应
ReplyHeader hdr =
new ReplyHeader(request.cxid, request.zxid, err.intValue());
zks.serverStats().updateLatency(request.createTime);
cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
request.createTime, System.currentTimeMillis());
try {
cnxn.sendResponse(hdr, rsp, &quot;response&quot;);
if (closeSession) {
cnxn.sendCloseSession();
}
} catch (IOException e) {
LOG.error(&quot;FIXMSG&quot;,e);
}
  构造一个ReplyHeader,并且响应见cnxn.sendResponse
  

3.4)snapshot产生时机及命名规则?

  

3.4.1)3个时机

  •   加载的时候,用于merge快照和事务日志
  •   正如上面提到的SyncRequestProcessor线程当事务日志多于一定数量且较空闲
  •   follower线程接受到leader线程相关指令
3.4.2)命名规则
public static String makeSnapshotName(long zxid) {
return &quot;snapshot.&quot; + Long.toHexString(zxid);
}
  

4)小结

  •   服务器包含两个重要日志:快照以及事务日志

  •   服务器使用单线程来处理所有请求(和存储相关)
  •   服务器选择了合适的时机产生快照以及roll事务日志,避免阻塞
  •   和存储(写磁盘)相关的方法都是同步的(synchronized),虽然一个线程在操作
  •   如果服务器崩溃了,日志还没有flush掉,数据会丢失(至少单机的时候是这样)
  •   所有的读请求直接从内存走和磁盘无关,但响应速度会受一些影响见上面的3.0)
  •   接下来的几个问题还需跟踪

    •   为什么会同时加载多个snapshot? 1个最近的snapshot不就够了?
    •   sessiontrack的机制?ZKDatabase中的sessionsWithTimeouts与sessiontrack的关系
    •   集群环境下又会有什么变化?


  

  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-141874-1-1.html 上篇帖子: hadoop2.6伪分布+pig0.15+zookeeper3.4.6安装 下篇帖子: hadoop+hbase+zookeeper环境搭建过程总结
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表