g87616758 发表于 2015-9-6 09:42:56

ZooKeeper源码阅读(二):客户端

  源代码:

http://svn.apache.org/repos/asf/zookeeper/trunk/
  
  导入eclipse:
  在包含build.xml目录下执行ant eclipse将产生.classpath文件
  
  目录结构:
  src/recipes:提供了各种Zookeeper应用例子
  src/c:提供了c版客户端。zookeeper_st,zookeeper_mt两个library
  src/contrib:别人贡献的代码?
  src/generated:由jute生成的java实体类
  
  客户端入口:org.apache.zookeeper.ZooKeeperMain
  
  
  //读取命令行输入,用MyCommandOptions解析。
  //内部类MyCommandOptions包含成员命令名command、参数列表cmdArgs
  -option value –option value command cmdArgs
  
  //根据以上解析的ip、端口,连接到ZooKeeper
      zk = newZooKeeper(host,
                Integer.parseInt(cl.getOption("timeout")),
               newMyWatcher(), readOnly);
  
  //执行命令,在ZooKeeperMain.run()
  //ZooKeeperMain只是一个外壳,使用jline实现了命令提示功能。
  //commandMapCli将提供的命令命令名与执行体CliCommand关联
      //execute from commandMap
      CliCommandcliCmd = commandMapCli.get(cmd);
      if(cliCmd!=null) {
         cliCmd.setZk(zk);
            watch =cliCmd.parse(args).exec();
                   }
  
  //最终转到调用ZooKeeper方法
  //提供的命令:
  quit:Zk.close()关闭zk连接,调用cnxn.close()
  history:列出历史记录
  redo index:重新执行历史记录
  printwatches :查看/设置watche开关状态
  connect:connectToZK(host)连接zk
  

  //ZooKeeper内部连接
      cnxn = newClientCnxn(connectStringParser.getChrootPath(),
                hostProvider,sessionTimeout,this,watchManager,
                getClientCnxnSocket(),canBeReadOnly);
      cnxn.start();
  
  
  ClientCnxn包含SendThread和EventThread两个线程
  SendThread将事件添加到waitEvents队列中,EventThread线程消费该队列。
  
  //下面以ls命令为例
  
  //调用zk.getChildren
    publicboolean exec() throwsKeeperException, InterruptedException {
      String path= args;
      boolean watch =cl.hasOption("w");
       List<String> children = zk.getChildren(path, watch);
      out.println(children);
      return watch;
    }
  
  
  //getChildren生成request
       RequestHeader h = newRequestHeader();
       h.setType(ZooDefs.OpCode.getChildren);
       GetChildrenRequest request = newGetChildrenRequest();
       request.setPath(serverPath);
       request.setWatch(watcher != null);
       GetChildrenResponse response = newGetChildrenResponse();
      ReplyHeader r =cnxn.submitRequest(h, request,response, wcb);
  
  
  //submitRequest调用queuePacket
    publicReplyHeadersubmitRequest(RequestHeaderh, Record request,
            Recordresponse, WatchRegistration watchRegistration)
            throwsInterruptedException {
      ReplyHeaderr = new ReplyHeader();
      Packetpacket = queuePacket(h,r, request, response,null,null,null,
                   null, watchRegistration);
      synchronized(packet) {
            while (!packet.finished) {
               packet.wait();
            }
      }
      return r;
  }
  
  
  //queuePacket将Packet添加到outgoingQueue队列中
            packet= new Packet(h, r, request, response,watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath =clientPath;
            packet.serverPath =serverPath;

                outgoingQueue.add(packet);

         //然后唤醒selector
      sendThread.getClientCnxnSocket().wakeupCnxn();
  
  

  
  //sendThread.run消费outgoingQueue
      clientCnxnSocket.doTransport(to,pendingQueue,outgoingQueue,ClientCnxn.this);
  
  
  //selector判断读/写事件
  //doTransport调用doIO,doIO解析Response
  //读事件
      int rc =sock.read(incomingBuffer);
      sendThread.readResponse(incomingBuffer);
  //写事件
       sock.write(p.bb);
  
  

  
  //readResponse在finally块中调用finishPacket,finishPacket将设置packet.finish,
  //此时submitRequest返回response。
    try {
      packet.replyHeader.setXid(replyHdr.getXid());
      packet.replyHeader.setErr(replyHdr.getErr());
      packet.replyHeader.setZxid(replyHdr.getZxid());
      if(replyHdr.getZxid() > 0) {
            lastZxid =replyHdr.getZxid();
      }
      if(packet.response !=null&& replyHdr.getErr() == 0) {
            packet.response.deserialize(bbia,"response");
      }
    } finally {
      finishPacket(packet);
  }
  

  以下图片转自:http://www.spnguru.com/2010/08/zookeeper%E5%85%A8%E8%A7%A3%E6%9E%90%E2%80%94%E2%80%94client%E7%AB%AF/



  
页: [1]
查看完整版本: ZooKeeper源码阅读(二):客户端