设为首页 收藏本站
查看: 790|回复: 0

[经验分享] [Zookeeper学习笔记十]Zookeeper源代码分析之ClientCnxn数据序列化和反序列化

[复制链接]

尚未签到

发表于 2017-4-19 12:04:42 | 显示全部楼层 |阅读模式
  ClientCnxn是Zookeeper客户端和Zookeeper服务器端进行通信和事件通知处理的主要类,它内部包含两个类,1. SendThread 2. EventThread, SendThread负责客户端和服务器端的数据通信,也包括事件信息的传输,EventThread主要在客户端回调注册的Watchers进行通知处理
   
  ClientCnxn构造方法
   
   

    /**
* Creates a connection object. The actual network connect doesn't get
* established until needed. The start() instance method must be called
* subsequent to construction.
*
* @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
* @param hostProvider
*                the list of ZooKeeper servers to connect to
* @param sessionTimeout
*                the timeout for connections.
* @param zooKeeper
*                the zookeeper object that this connection is related to.
* @param watcher watcher for this connection
* @param clientCnxnSocket
*                the socket implementation used (e.g. NIO/Netty)
* @param sessionId session id if re-establishing session
* @param sessionPasswd session passwd if re-establishing session
* @param canBeReadOnly
*                whether the connection is allowed to go to read-only
*                mode in case of partitioning
* @throws IOException
*/
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
//如果zookeeper集群有1000台,那么会话超时时间岂不是要设置的很大?因此,zookeeper一般不会很大,3台或者5台足亦
connectTimeout = sessionTimeout / hostProvider.size();//链接超时时间是会话超时时间除以Zookeeper集群数
//读超时是会话超时的2/3
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
//读线程,在ClientCnxnd的start方法中启动
sendThread = new SendThread(clientCnxnSocket);
//会话线程,在ClientCnxnd的start方法中启动
eventThread = new EventThread();
}

    对于SendThread数据传输线程包含两方面的内容,1是基于TCP/IP的Socket的数据传输,2.传输的数据内容。首先关注传输的数据内容,在TCP/IP传输的数据都是字节,因此,在SendThread发送数据之前,需要将要传输的数据结构进行序列化成字节流,服务器端会反序列化成相应的数据结构。当客户端收到服务器返回的字节流时,客户端将其反序列化为相应的数据结构。
   
  首先看看数据的序列化和反序列化,接口定义:

package org.apache.jute //该借口不是Zookeeper原生提供的,是Apache的jute提供的
import java.io.IOException;
/**
* Interface that is implemented by generated classes.
*
*/
public interface Record {
public void serialize(OutputArchive archive, String tag) //序列化,tag在XmlInputArchive序列化器中,用作xml元素标签
throws IOException;
public void deserialize(InputArchive archive, String tag)//反序列话
throws IOException;
}

   OutputArchive接口是数据结构序列化为字节流的字节流写入器,InputArchive接口是字节流反序列化为数据结构的字节流读取器。OutputArchive和InputArchive接口有三个成对使用的实现类
  BinaryOutputArchive和BinaryInputArchive底层使用DataOutput和DataInput作为字节容器
   
  XmlOutputArchive和XmlInputArchive底层使用PrintStream,XmlInputArchive使用Xml解析的方式得到相应的数据结构
  CsvOutputArchive和CsvInputArchive底层使用PrintStream作为自己容器
   
  Zookeeper客户端向服务器端发送请求,包含请求头和请求正文两部分,每个请求的请求头的类型都是一样的,而请求正文根据请求的不同,分为多种类型。
  Zookeeper服务器端向客户端返回响应数据,包含响应头和响应正文两部分,每个响应的响应头的类型都是一样的,而响应正文根据请求的不同,分为多种类型。
   
  请求头,各种请求正文,响应头和响应正文因为要在Socket上进行数据传输,所以它们应该都是可序列化和反序列话的,因此它们都是可序列化的
   
   
  请求头:

public class RequestHeader implements Record {
private int xid; //请求的事务id,具体的含义和功能接下来分析
private int type; //请求类型?
public RequestHeader() {
}
public RequestHeader(
int xid,
int type) {
this.xid=xid;
this.type=type;
}
public int getXid() {
return xid;
}
public void setXid(int m_) {
xid=m_;
}
public int getType() {
return type;
}
public void setType(int m_) {
type=m_;
}
//序列化操作,将xid和type序列化到OutputArchive中,
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag); //对于最常使用的BinaryOutputArchive,此方法空实现
a_.writeInt(xid,"xid");
a_.writeInt(type,"type");
a_.endRecord(this,tag);//对于最常使用的BinaryOutputArchive,此方法空实现
}
//序列化操作,将xid和type反序列化
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);//对于最常使用的BinaryInputArchive,此方法空实现
xid=a_.readInt("xid");
type=a_.readInt("type");
a_.endRecord(tag);//对于最常使用的BinaryInputArchive,此方法空实现
}
public String toString() {
try {
java.io.ByteArrayOutputStream s =
new java.io.ByteArrayOutputStream();
CsvOutputArchive a_ =
new CsvOutputArchive(s);
a_.startRecord(this,"");//对于CsvOutputArchive,startRecord方法
a_.writeInt(xid,"xid");
a_.writeInt(type,"type");
a_.endRecord(this,"");
return new String(s.toByteArray(), "UTF-8");
} catch (Throwable ex) {
ex.printStackTrace();
}
return "ERROR";
}
public void write(java.io.DataOutput out) throws java.io.IOException {
BinaryOutputArchive archive = new BinaryOutputArchive(out);
serialize(archive, "");
}
public void readFields(java.io.DataInput in) throws java.io.IOException {
BinaryInputArchive archive = new BinaryInputArchive(in);
deserialize(archive, "");
}
public int compareTo (Object peer_) throws ClassCastException {
if (!(peer_ instanceof RequestHeader)) {
throw new ClassCastException("Comparing different types of records.");
}
RequestHeader peer = (RequestHeader) peer_;
int ret = 0;
ret = (xid == peer.xid)? 0 :((xid<peer.xid)?-1:1);
if (ret != 0) return ret;
ret = (type == peer.type)? 0 :((type<peer.type)?-1:1);
if (ret != 0) return ret;
return ret;
}
public boolean equals(Object peer_) {
if (!(peer_ instanceof RequestHeader)) {
return false;
}
if (peer_ == this) {
return true;
}
RequestHeader peer = (RequestHeader) peer_;
boolean ret = false;
ret = (xid==peer.xid);
if (!ret) return ret;
ret = (type==peer.type);
if (!ret) return ret;
return ret;
}
public int hashCode() {
int result = 17;
int ret;
ret = (int)xid;
result = 37*result + ret;
ret = (int)type;
result = 37*result + ret;
return result;
}
public static String signature() {
return "LRequestHeader(ii)";
}
}

   
  请求正文有很多,比如


  • 链接请求ConnectRequest
  • 创建znode请求CreateRequest
  • 节点是否存在请求ExistsRequest
  • 删除znode请求DeleteRequest
  • 获取child znodes请求GetChildrenRequest
  • 设置znode数据SetDataRequest
  • 事件WatcherEvent
  以CreateRequest为例进行分析
   

public class CreateRequest implements Record {
private String path; //创建znode节点的path
private byte[] data; //创建znode节点时的节点数据
private java.util.List<org.apache.zookeeper.data.ACL> acl; //创建znode节点时的ACL
private int flags;//这个参数干啥的?
public CreateRequest() {
}
public CreateRequest(
String path,
byte[] data,
java.util.List<org.apache.zookeeper.data.ACL> acl,
int flags) {
this.path=path;
this.data=data;
this.acl=acl;
this.flags=flags;
}
public String getPath() {
return path;
}
public void setPath(String m_) {
path=m_;
}
public byte[] getData() {
return data;
}
public void setData(byte[] m_) {
data=m_;
}
public java.util.List<org.apache.zookeeper.data.ACL> getAcl() {
return acl;
}
public void setAcl(java.util.List<org.apache.zookeeper.data.ACL> m_) {
acl=m_;
}
public int getFlags() {
return flags;
}
public void setFlags(int m_) {
flags=m_;
}
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeString(path,"path");//写入path
a_.writeBuffer(data,"data");//写入data字节数组
{
a_.startVector(acl,"acl");//写入acl,acl是List类型
if (acl!= null) {        
int len1 = acl.size();
for(int vidx1 = 0; vidx1<len1; vidx1++) {
org.apache.zookeeper.data.ACL e1 = (org.apache.zookeeper.data.ACL) acl.get(vidx1);
a_.writeRecord(e1,"e1");//ACL也是一个Record
}
}
a_.endVector(acl,"acl");
}
a_.writeInt(flags,"flags");//写入flags
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
path=a_.readString("path");
data=a_.readBuffer("data");
{
Index vidx1 = a_.startVector("acl");
if (vidx1!= null) {          acl=new java.util.ArrayList<org.apache.zookeeper.data.ACL>();
for (; !vidx1.done(); vidx1.incr()) {
org.apache.zookeeper.data.ACL e1;
e1= new org.apache.zookeeper.data.ACL();
a_.readRecord(e1,"e1");
acl.add(e1);
}
}
a_.endVector("acl");
}
flags=a_.readInt("flags");
a_.endRecord(tag);
}
public String toString() {
try {
java.io.ByteArrayOutputStream s =
new java.io.ByteArrayOutputStream();
CsvOutputArchive a_ =
new CsvOutputArchive(s);
a_.startRecord(this,"");
a_.writeString(path,"path");
a_.writeBuffer(data,"data");
{
a_.startVector(acl,"acl");
if (acl!= null) {          int len1 = acl.size();
for(int vidx1 = 0; vidx1<len1; vidx1++) {
org.apache.zookeeper.data.ACL e1 = (org.apache.zookeeper.data.ACL) acl.get(vidx1);
a_.writeRecord(e1,"e1");
}
}
a_.endVector(acl,"acl");
}
a_.writeInt(flags,"flags");
a_.endRecord(this,"");
return new String(s.toByteArray(), "UTF-8");
} catch (Throwable ex) {
ex.printStackTrace();
}
return "ERROR";
}
public void write(java.io.DataOutput out) throws java.io.IOException {
BinaryOutputArchive archive = new BinaryOutputArchive(out);
serialize(archive, "");
}
public void readFields(java.io.DataInput in) throws java.io.IOException {
BinaryInputArchive archive = new BinaryInputArchive(in);
deserialize(archive, "");
}
public int compareTo (Object peer_) throws ClassCastException {
throw new UnsupportedOperationException("comparing CreateRequest is unimplemented");
}
public boolean equals(Object peer_) {
if (!(peer_ instanceof CreateRequest)) {
return false;
}
if (peer_ == this) {
return true;
}
CreateRequest peer = (CreateRequest) peer_;
boolean ret = false;
ret = path.equals(peer.path);
if (!ret) return ret;
ret = org.apache.jute.Utils.bufEquals(data,peer.data);
if (!ret) return ret;
ret = acl.equals(peer.acl);
if (!ret) return ret;
ret = (flags==peer.flags);
if (!ret) return ret;
return ret;
}
public int hashCode() {
int result = 17;
int ret;
ret = path.hashCode();
result = 37*result + ret;
ret = java.util.Arrays.toString(data).hashCode();
result = 37*result + ret;
ret = acl.hashCode();
result = 37*result + ret;
ret = (int)flags;
result = 37*result + ret;
return result;
}
public static String signature() {
return "LCreateRequest(sB[LACL(iLId(ss))]i)";
}
}

    
   ConnectRequest的请求数据:
    private int protocolVersion;
  private long lastZxidSeen; //客户端保存的Zxid最近时间,zxid有什么用呢?
  private int timeOut;//会话超时时间
  private long sessionId;
  private byte[] passwd;

  ClientCnxn的内部类Packet类封装了请求头,响应头,请求正文和响应征正文

   static class Packet {
RequestHeader requestHeader;//请求头
ReplyHeader replyHeader; //响应头
Record request;//请求正文
Record response; //响应正文
ByteBuffer bb;//上面四部分序列化的字节流
/** Client's view of the path (may differ due to chroot) **/
String clientPath;
/** Servers's view of the path (may differ due to chroot) **/
String serverPath;
boolean finished;
AsyncCallback cb;//异步请求的响应Callback
Object ctx;
WatchRegistration watchRegistration;
public boolean readOnly;
/** Convenience ctor */
Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
Record request, Record response,
WatchRegistration watchRegistration) {
this(requestHeader, replyHeader, request, response,
watchRegistration, false);
}
Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
Record request, Record response,
WatchRegistration watchRegistration, boolean readOnly) {
this.requestHeader = requestHeader;
this.replyHeader = replyHeader;
this.request = request;
this.response = response;
this.readOnly = readOnly;
this.watchRegistration = watchRegistration;
}
public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);//序列化字节流容器
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
requestHeader.serialize(boa, "header");
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());//将字节流容器中的字节流复制给bb
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("clientPath:" + clientPath);
sb.append(" serverPath:" + serverPath);
sb.append(" finished:" + finished);
sb.append(" header:: " + requestHeader);
sb.append(" replyHeader:: " + replyHeader);
sb.append(" request:: " + request);
sb.append(" response:: " + response);
// jute toString is horrible, remove unnecessary newlines
return sb.toString().replaceAll("\r*\n+", " ");
}
}
  ClientCnxn类包含两个队列(LinkedList),队列中的元素都是Packet类型,pengdingQueue表示请求已经发送,等待响应结果;outgoingQueue表示等待发送请求的请求序列

/**
* These are the packets that have been sent and are waiting for a response.
*/
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
/**
* These are the packets that need to be sent.
*/
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
  ClientCnxn的Socket的数据传输,将另外一篇进行单独分析
   
   
   
   
   
   
   
   
   
   
   
   
   
   

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366391-1-1.html 上篇帖子: SolrCloud之分布式索引及与Zookeeper的集成 下篇帖子: 【分布式数据一致性二】Zookeeper数据读写一致性
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表