设为首页 收藏本站
查看: 1207|回复: 0

[经验分享] ZooKeeper源码分析(二)

[复制链接]

尚未签到

发表于 2017-4-19 09:14:22 | 显示全部楼层 |阅读模式
  上一节分析了ZooKeeper的部分代码,下面我们看看客户端网络连接器的部分代码

/**
这个类管理客户端的socket I/O。ClientCnxn维护一个可用服务器列表可以根据需要透明地切换服务器
*
*/
public class ClientCnxn {
private static final Logger LOG = LoggerFactory.getLogger(ClientCnxn.class);
private static final String ZK_SASL_CLIENT_USERNAME =
"zookeeper.sasl.client.username";
/** 客户端在会话重连接时自动复位监视器,这个操作允许客户端通过设置环境变量zookeeper.disableAutoWatchReset=true来关闭这个行为
*/
private static boolean disableAutoWatchReset;
static {
disableAutoWatchReset =
Boolean.getBoolean("zookeeper.disableAutoWatchReset");
if (LOG.isDebugEnabled()) {
LOG.debug("zookeeper.disableAutoWatchReset is "
+ disableAutoWatchReset);
}
}
static class AuthData {
AuthData(String scheme, byte data[]) {
this.scheme = scheme;
this.data = data;
}
String scheme;
byte data[];
}
private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>();
/**
*哪些已经发送出去的目前正在等待响应的包
*/
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
/**
* 那些需要发送的包
*/
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
// 超时时间
private int connectTimeout;
/**
*客户端与服务器协商的超时时间,以毫秒为单位。这是真正的超时时间,而不是客户端的超时请求
*/
private volatile int negotiatedSessionTimeout;
// 读取超时时间
private int readTimeout;
// 会话超时时间
private final int sessionTimeout;
// ZooKeeper
private final ZooKeeper zooKeeper;
//客户端监视器管理器
private final ClientWatchManager watcher;
//会话ID
private long sessionId;
//会话密钥
private byte sessionPasswd[] = new byte[16];
// 是否只读
private boolean readOnly;
final String chrootPath;
// 发送线程
final SendThread sendThread;
// 事件回调线程
final EventThread eventThread;
/**
* Set to true when close is called. Latches the connection such that we
* don't attempt to re-connect to the server if in the middle of closing the
* connection (client sends session disconnect to server as part of close
* operation)
*/
private volatile boolean closing = false;
/**
一组客户端可以连接的Zk主机
*/
private final HostProvider hostProvider;
/**
* 第一次和读写服务器建立连接时设置为true,之后不再改变。
这个值用来处理客户端没有sessionId连接只读模式服务器的场景.
客户端从只读服务器收到一个假的sessionId,这个sessionId对于其他服务器是无效的。所以
当客户端寻找一个读写服务器时,它在连接握手时发送0代替假的sessionId,建立一个新的,有效的会话
如果这个属性是false(这就意味着之前没有找到过读写服务器)则表示非0的sessionId是假的否则就是有效的
*/
volatile boolean seenRwServerBefore = false;

public ZooKeeperSaslClient zooKeeperSaslClient;
public long getSessionId() {
return sessionId;
}
public byte[] getSessionPasswd() {
return sessionPasswd;
}
public int getSessionTimeout() {
return negotiatedSessionTimeout;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress();
SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress();
sb
.append("sessionid:0x").append(Long.toHexString(getSessionId()))
.append(" local:").append(local)
.append(" remoteserver:").append(remote)
.append(" lastZxid:").append(lastZxid)
.append(" xid:").append(xid)
.append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount())
.append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount())
.append(" queuedpkts:").append(outgoingQueue.size())
.append(" pendingresp:").append(pendingQueue.size())
.append(" queuedevents:").append(eventThread.waitingEvents.size());
return sb.toString();
}

/**
* 创建一个连接对象。真正的网路连接直到需要的时候才建立。start()方法在执行构造方法后一定要调用
* 这个构造方法在ZooKeeper的初始化时调用,用于初始化一个客户端网路管理器
*/
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
throws IOException {
this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
clientCnxnSocket, 0, new byte[16], canBeReadOnly);
}

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
//客户端实例
this.zooKeeper = zooKeeper;
// 客户端Watcher管理器
this.watcher = watcher;
//sessionId
this.sessionId = sessionId;
//会话密钥
this.sessionPasswd = sessionPasswd;
//会话超时时间
this.sessionTimeout = sessionTimeout;
//服务器地址列表管理器
this.hostProvider = hostProvider;
//根路径
this.chrootPath = chrootPath;
// 连接超时时间是会话超时时间和服务器数量的比值
connectTimeout = sessionTimeout / hostProvider.size();
// 读超时时间是会话超时时间的2/3
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
//创建发送和事件处理线程
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
}

public static boolean getDisableAutoResetWatch() {
return disableAutoWatchReset;
}
public static void setDisableAutoResetWatch(boolean b) {
disableAutoWatchReset = b;
}
//启动发送和事件处理线程
public void start() {
sendThread.start();
eventThread.start();
}


// 事件处理线程
class EventThread extends Thread {
//等待处理的事件
private final LinkedBlockingQueue<Object> waitingEvents =
new LinkedBlockingQueue<Object>();
/**
*这个是真正的排队会话的状态,知道事件处理线程真正处理事件并将其返回给监视器。
**/
private volatile KeeperState sessionState = KeeperState.Disconnected;
private volatile boolean wasKilled = false;
private volatile boolean isRunning = false;
EventThread() {
// 构造一个线程名
super(makeThreadName("-EventThread"));
setUncaughtExceptionHandler(uncaughtExceptionHandler);
// 设置为守护线程
setDaemon(true);
}
//
public void queueEvent(WatchedEvent event) {
// 如果WatchedEvent的类型是None状态是sessionStat的值则不处理
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
// 获取事件的状态
sessionState = event.getState();
// 构建一个基于事件的监视器
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// 排队pair,稍后处理
waitingEvents.add(pair);
}
// 排队Packet
public void queuePacket(Packet packet) {
if (wasKilled) {
synchronized (waitingEvents) {
if (isRunning) waitingEvents.add(packet);
else processEvent(packet);
}
} else {
waitingEvents.add(packet);
}
}
public void queueEventOfDeath() {
waitingEvents.add(eventOfDeath);
}
@Override
public void run() {
try {
isRunning = true;
while (true) {
//从等待处理的事件队列中获取事件
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down");
}
// 真正处理事件的入口,主要是回调处理
private void processEvent(Object event) {
try {
// 如果事件是WatcherSetEventPair
if (event instanceof WatcherSetEventPair) {
//每个监视器都会处理这个事件
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} else {
Packet p = (Packet) event;
int rc = 0;
// 获取客户端路径
String clientPath = p.clientPath;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
}
if (p.cb == null) {
LOG.warn("Somehow a null cb got to EventThread!");
} else if (p.response instanceof ExistsResponse
|| p.response instanceof SetDataResponse
|| p.response instanceof SetACLResponse) {
// 获取回调对象
StatCallback cb = (StatCallback) p.cb;
// 如果处理成功回调方法会传入响应状态,否则响应状态为null
if (rc == 0) {
if (p.response instanceof ExistsResponse) {
cb.processResult(rc, clientPath, p.ctx,
((ExistsResponse) p.response)
.getStat());
} else if (p.response instanceof SetDataResponse) {
cb.processResult(rc, clientPath, p.ctx,
((SetDataResponse) p.response)
.getStat());
} else if (p.response instanceof SetACLResponse) {
cb.processResult(rc, clientPath, p.ctx,
((SetACLResponse) p.response)
.getStat());
}
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetDataResponse) {
DataCallback cb = (DataCallback) p.cb;
GetDataResponse rsp = (GetDataResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getData(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null,
null);
}
} else if (p.response instanceof GetACLResponse) {
ACLCallback cb = (ACLCallback) p.cb;
GetACLResponse rsp = (GetACLResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getAcl(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null,
null);
}
} else if (p.response instanceof GetChildrenResponse) {
ChildrenCallback cb = (ChildrenCallback) p.cb;
GetChildrenResponse rsp = (GetChildrenResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getChildren());
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetChildren2Response) {
Children2Callback cb = (Children2Callback) p.cb;
GetChildren2Response rsp = (GetChildren2Response) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getChildren(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof CreateResponse) {
StringCallback cb = (StringCallback) p.cb;
CreateResponse rsp = (CreateResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx,
(chrootPath == null
? rsp.getPath()
: rsp.getPath()
.substring(chrootPath.length())));
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.cb instanceof VoidCallback) {
VoidCallback cb = (VoidCallback) p.cb;
cb.processResult(rc, clientPath, p.ctx);
}
}
} catch (Throwable t) {
LOG.error("Caught unexpected throwable", t);
}
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366165-1-1.html 上篇帖子: 监听zookeeper事件变化 下篇帖子: Zookeeper客户端库ZkClient
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表