设为首页 收藏本站
查看: 1372|回复: 0

[经验分享] 深入浅出Zookeeper之七分布式CREATE事务处理

[复制链接]

尚未签到

发表于 2017-4-19 13:11:51 | 显示全部楼层 |阅读模式
  前面几篇文章讲了follower和leader之间如何选举和初始化的,这一篇将以之前描述过的CREATE请求作为例子来描述在集群环境下是如何处理事务的。
  关于client和zookeeper server的描述前几篇文章已经涉及了。这里不就不再赘述了。假设client和某一个follower建立了连接,并发送了CREATE请求。在follower端,IO线程拿到请求开始执行处理链,Follower处理链如下
DSC0000.jpg

  初始化代码:

    protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true);
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}
  第一个处理器是FollowerRequestProcessor,处理如下

while (!finished) {
Request request = queuedRequests.take();
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
'F', request, "");
}
if (request == Request.requestOfDeath) {
break;
}
// We want to queue the request to be processed before we submit
// the request to the leader so that we are ready to receive
// the response
//先交给CommitProcessor,最终投票通过后,会通过CommitProcessor的commit方法最终提交事务
nextProcessor.processRequest(request);
// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
// of the sync operations this follower has pending, so we
// add it to pendingSyncs.
//只有事务请求才转发给leader,进行投票
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
zks.getFollower().request(request);
break;
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
case OpCode.multi:
zks.getFollower().request(request);
break;
}
  转发事务请求给leader

   void request(Request request) throws IOException {
//反序列化
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream oa = new DataOutputStream(baos);
oa.writeLong(request.sessionId);
oa.writeInt(request.cxid);
oa.writeInt(request.type);
if (request.request != null) {
request.request.rewind();
int len = request.request.remaining();
byte b[] = new byte[len];
request.request.get(b);
request.request.rewind();
oa.write(b);
}
oa.close();
QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
.toByteArray(), request.authInfo);
writePacket(qp, true);
}
  在CommitProcessor中主要是等待缓存请求,并等待该请求被commit

while (!finished) {
int len = toProcess.size();
//最终的请求处理交给FinalRequestProcessor
for (int i = 0; i < len; i++) {
nextProcessor.processRequest(toProcess.get(i));
}
toProcess.clear();
synchronized (this) {
//如果没有commit请求,则wait,直到commit请求的时候唤醒
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() == 0) {
wait();
continue;
}
// First check and see if the commit came in for the pending
// request
//有commit请求,则添加到最终队列,下一轮处理
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() > 0) {
Request r = committedRequests.remove();
/*
* We match with nextPending so that we can move to the
* next request when it is committed. We also want to
* use nextPending because it has the cnxn member set
* properly.
*/
//如果是自己的请求,则使用之前的Request,以为之前的Request带client的连接信息,可以写回响应
if (nextPending != null
&& nextPending.sessionId == r.sessionId
&& nextPending.cxid == r.cxid) {
// we want to send our version of the request.
// the pointer to the connection in the request
nextPending.hdr = r.hdr;
nextPending.txn = r.txn;
nextPending.zxid = r.zxid;
toProcess.add(nextPending);
nextPending = null;
}
//如果是别人的请求,则使用新的Request,不带连接信息,无法发送响应
else {
// this request came from someone else so just
// send the commit packet
toProcess.add(r);
}
}
}
// We haven't matched the pending requests, so go back to
// waiting
//有pending请求,但是该请求还未commit,则继续
if (nextPending != null) {
continue;
}
//从队列中拿待处理请求
synchronized (this) {
// Process the next requests in the queuedRequests
while (nextPending == null && queuedRequests.size() > 0) {
Request request = queuedRequests.remove();
switch (request.type) {
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.multi:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
nextPending = request;
break;
case OpCode.sync:
if (matchSyncs) {
nextPending = request;
} else {
toProcess.add(request);
}
break;
default:
toProcess.add(request);
}
}
}
  在这个场景中,CREATE请求先到了queuedRequests中,然后nextPending会指向这个请求,但是此时还未commit,所以CommitProcessor会wait,直到该请求投票被通过,然后被commit。
  此时leader收到了转发的请求,在LearnerHandler中

case Leader.REQUEST:                    
//反序列化
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if(type == OpCode.sync){
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
//提交给执行链处理
leader.zk.submitRequest(si);
break;
  Leader端的执行链如下
DSC0001.jpg

  PrepRequestProcessor在之前的文章已经分析过了,主要是根据请求类型,拼装不同的Request,这里是CreateRequest
  接下来ProposalRequestProcessor执行,ProposalRequestProcessor主要是发起投票

public void processRequest(Request request) throws RequestProcessorException {
......               
/* In the following IF-THEN-ELSE block, we process syncs on the leader.
* If the sync is coming from a follower, then the follower
* handler adds it to syncHandler. Otherwise, if it is a client of
* the leader that issued the sync command, then syncHandler won't
* contain the handler. In this case, we add it to syncHandler, and
* call processRequest on the next processor.
*/
if(request instanceof LearnerSyncRequest){
zks.getLeader().processSync((LearnerSyncRequest)request);
} else {
//先交给CommitProcessor处理下,此时还未提交
nextProcessor.processRequest(request);
if (request.hdr != null) {
// We need to sync and get consensus on any transactions
try {
//发起一个投票
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
//先写日志
syncProcessor.processRequest(request);
}
}
}
  leader发起投票

public Proposal propose(Request request) throws XidRolloverException {
.......
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
try {
request.hdr.serialize(boa, "hdr");
if (request.txn != null) {
request.txn.serialize(boa, "txn");
}
baos.close();
} catch (IOException e) {
LOG.warn("This really should be impossible", e);
}
//投票包
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
baos.toByteArray(), null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
synchronized (this) {
if (LOG.isDebugEnabled()) {
LOG.debug("Proposing:: " + request);
}
lastProposed = p.packet.getZxid();
//添加到投票箱,后续leader收到选票时会检查这个投票箱里的投票是否满足条件
outstandingProposals.put(lastProposed, p);
//给每个follower发一个投票包,让他们投票
sendPacket(pp);
}
return p;
}
  leader发完投票后,通过SyncRequestProcessor将事务写入日志文件,本地写成功后,投票成功。
  SyncRequestProcessor之前文章已经分析过了,主要是将事务顺序写入日志文件。主要看之后的AckRequestProcessor

    public void processRequest(Request request) {
QuorumPeer self = leader.self;
if(self != null)
//本地日志写成功后,认为自己成功了
leader.processAck(self.getId(), request.zxid, null);
else
LOG.error("Null QuorumPeer");
}
  leader的processAck方法比较关键,之前也有分析,这里再强调下

   synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
.......
//当有选票进来时,先看看是哪个投票的
Proposal p = outstandingProposals.get(zxid);
if (p == null) {
LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
Long.toHexString(zxid), followerAddr);
return;
}
//把票投上
p.ackSet.add(sid);
if (LOG.isDebugEnabled()) {
LOG.debug("Count for zxid: 0x{} is {}",
Long.toHexString(zxid), p.ackSet.size());
}
//如果满足投票结束条件,默认是半数server统一,则提交事务
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){            
if (zxid != lastCommitted+1) {
LOG.warn("Commiting zxid 0x{} from {} not first!",
Long.toHexString(zxid), followerAddr);
LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
}
outstandingProposals.remove(zxid);
if (p.request != null) {
//先添加到带提交队列
toBeApplied.add(p);
}
// We don't commit the new leader proposal
if ((zxid & 0xffffffffL) != 0) {
if (p.request == null) {
LOG.warn("Going to commmit null request for proposal: {}", p);
}
//事务提交,通知follower提交事务
commit(zxid);
//通知Observer
inform(p);
//leader commit事务
zk.commitProcessor.commit(p.request);
......
}
}
  通知follower提交事务

    public void commit(long zxid) {
synchronized(this){
lastCommitted = zxid;
}
//发送COMMIT包
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
sendPacket(qp);
}
  此时Follower收到proposal包,follower中处理投票

case Leader.PROPOSAL:            
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();
//记录事务日志,成功后发送ACK包
fzk.logRequest(hdr, txn);
break;

    public void logRequest(TxnHeader hdr, Record txn) {
Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
hdr.getType(), null, null);
request.hdr = hdr;
request.txn = txn;
request.zxid = hdr.getZxid();
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
//还是通过SyncRequestProcessor将事务写入本地文件,再发送ack包
syncProcessor.processRequest(request);
}
  日志写成功后,SendAckRequestProcessor发送ACK包

    public void processRequest(Request si) {
if(si.type != OpCode.sync){
//ACK包
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
null);
try {
//发送
learner.writePacket(qp, false);
} catch (IOException e) {
LOG.warn("Closing connection to leader, exception during packet send", e);
try {
if (!learner.sock.isClosed()) {
learner.sock.close();
}
} catch (IOException e1) {
// Nothing to do, we are shutting things down, so an exception here is irrelevant
LOG.debug("Ignoring error closing the connection", e1);
}
}
}
}
  此时,leader收到ack包,LearnerHandler线程中

   case Leader.ACK:
if (this.learnerType == LearnerType.OBSERVER) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received ACK from Observer  " + this.sid);
}
}
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
  还是调用了processAck方法,由于之前已经有了leader自己的投票,此时follower再投一票,3台机器的集群即认为投票成功,leader开始发送commit操作,也就是发送commit包给follower。
  follower收到commit包

case Leader.COMMIT:
fzk.commit(qp.getZxid());
break;
public void commit(long zxid) {
if (pendingTxns.size() == 0) {
LOG.warn("Committing " + Long.toHexString(zxid)
+ " without seeing txn");
return;
}
long firstElementZxid = pendingTxns.element().zxid;
if (firstElementZxid != zxid) {
LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
+ " but next pending txn 0x"
+ Long.toHexString(firstElementZxid));
System.exit(12);
}
//从Pending队列中拿到待commit请求
Request request = pendingTxns.remove();
//commit这个请求,这个请求将交给FinalRequestProcessor处理
commitProcessor.commit(request);
}
  Commit之后请求将交给FinalRequestProcessor处理,修改最后的内存db结构,如果是本机请求则写回响应,如果不是则不用写回响应

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366460-1-1.html 上篇帖子: zookeeper 配置中心——利用ZkConfig实现分布式配置文件实时同步 下篇帖子: 一个基于zookeeper实现的分布式锁的性能测试
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表