bei 发表于 2015-9-6 12:35:01

zookeeper启动过程-粗略梳理

  zookeeper 3.4.6
=========
启动脚本里面
```
nohup "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \    -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &   
```
翻译过来之后太烦了,shit
```
java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /usr/local/luping/zookeeper-3.4.6/bin/../build/classes:/usr/local/luping/zookeeper-3.4.6/bin/../build/lib/*.jar:/usr/local/luping/zookeeper-3.4.6/bin/../lib/slf4j-log4j12-1.6.1.jar:/usr/local/luping/zookeeper-3.4.6/bin/../lib/slf4j-api-1.6.1.jar:/usr/local/luping/zookeeper-3.4.6/bin/../lib/netty-3.7.0.Final.jar:/usr/local/luping/zookeeper-3.4.6/bin/../lib/log4j-1.2.16.jar:/usr/local/luping/zookeeper-3.4.6/bin/../lib/jline-0.9.94.jar:/usr/local/luping/zookeeper-3.4.6/bin/../zookeeper-3.4.6.jar:/usr/local/luping/zookeeper-3.4.6/bin/../src/java/lib/*.jar:/usr/local/luping/zookeeper-3.4.6/bin/../conf:.:/data/java/jdk1.7.0_15/jre/lib/rt.jar:/data/java/jdk1.7.0_15/lib/dt.jar:/data/java/jdk1.7.0_15/lib/tools.jar -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain
/usr/local/luping/zookeeper-3.4.6/bin/../conf/zoo.cfg
```
这里看到启动入口是QuorumPeerMain,先看其main()函数关键代码
```
QuorumPeerMain main = new QuorumPeerMain();
main.initializeAndRun(args);
```
initializeAndRun()这种会启动单例还是集群模式的zookeeper
```
    protected void initializeAndRun(String[] args)
      throws ConfigException, IOException
    {
      QuorumPeerConfig config = new QuorumPeerConfig();
      if (args.length == 1) {
            config.parse(args);
      }
  // Start and schedule the the purge task
      DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
      purgeMgr.start();
  if (args.length == 1 && config.servers.size() > 0) {
            runFromConfig(config);
      } else {
            LOG.warn("Either no config or no quorum defined in config, running "
                  + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            ZooKeeperServerMain.main(args);
      }
    }
```
这里如果是集群模式走runFromConfig,首先注册Mxbean
```
ManagedUtil.registerLog4jMBeans();
```
然后是初始化连接工厂,默认使用NIO,这里是NIOServerCnxnFactory
```
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
          cnxnFactory.configure(config.getClientPortAddress(),
                              config.getMaxClientCnxns());
```
```
   static public ServerCnxnFactory createFactory() throws IOException {
      String serverCnxnFactoryName =
            System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
      if (serverCnxnFactoryName == null) {
            serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
      }
      try {
            return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                                                .newInstance();
      } catch (Exception e) {
            IOException ioe = new IOException("Couldn't instantiate "
                  + serverCnxnFactoryName);
            ioe.initCause(e);
            throw ioe;
      }
    }
```
z之后初始化QuorumPeer,进行相应的配置,最后启动QuorumPeer
```
    ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
          cnxnFactory.configure(config.getClientPortAddress(),
                              config.getMaxClientCnxns());

          quorumPeer = new QuorumPeer();
          quorumPeer.setClientPortAddress(config.getClientPortAddress());
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      new File(config.getDataLogDir()),
                      new File(config.getDataDir())));
          quorumPeer.setQuorumPeers(config.getServers());
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setLearnerType(config.getPeerType());
          quorumPeer.setSyncEnabled(config.getSyncEnabled());
          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());

          quorumPeer.start();
          quorumPeer.join();
```
ServerCnxnFactory.configure()这里配置ServerSocketChannel
```
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
      configureSaslLogin();
  thread = new Thread(this, "NIOServerCxn.Factory:" + addr);
      thread.setDaemon(true);
      maxClientCnxns = maxcc;
      this.ss = ServerSocketChannel.open();
      ss.socket().setReuseAddress(true);
      LOG.info("binding to port " + addr);
      ss.socket().bind(addr);
      ss.configureBlocking(false);
      ss.register(selector, SelectionKey.OP_ACCEPT);
    }
```
QuorumPeer.start()
```
    public synchronized void start() {
      loadDataBase();
      cnxnFactory.start();      
      startLeaderElection();
      super.start();
    }
```
1,首先加载数据 loadDataBase();
  2,启动NIO线程;
  3,启动leader选举
  首先看从本地加载数据loadDataBase();
```
    public long loadDataBase() throws IOException {
      PlayBackListener listener=new PlayBackListener(){
            public void onTxnLoaded(TxnHeader hdr,Record txn){
                Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
                        null, null);
                r.txn = txn;
                r.hdr = hdr;
                r.zxid = hdr.getZxid();
                addCommittedProposal(r);
            }
      };
      
      long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
      initialized = true;
      return zxid;
    }
  ```
从zk的事务日志snapLog中恢复
```
long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
```
之后会回调PlayBackListener的onTxnLoaded来onTxnLoaded提交Proposal(addCommittedProposal())
```
public long restore(DataTree dt, Map<Long, Integer> sessions,
            PlayBackListener listener) throws IOException {
      snapLog.deserialize(dt, sessions);
      FileTxnLog txnLog = new FileTxnLog(dataDir);
      TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
      long highestZxid = dt.lastProcessedZxid;
      TxnHeader hdr;
      try {
            while (true) {
                // iterator points to
                // the first valid txn when initialized
                hdr = itr.getHeader();
                if (hdr == null) {
                  //empty logs
                  return dt.lastProcessedZxid;
                }
                if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                  LOG.error("{}(higestZxid) > {}(next log) for type {}",
                            new Object[] { highestZxid, hdr.getZxid(),
                                    hdr.getType() });
                } else {
                  highestZxid = hdr.getZxid();
                }
                try {
                  processTransaction(hdr,dt,sessions, itr.getTxn());
                } catch(KeeperException.NoNodeException e) {
                   throw new IOException("Failed to process transaction type: " +
                         hdr.getType() + " error: " + e.getMessage(), e);
                }
                listener.onTxnLoaded(hdr, itr.getTxn());
                if (!itr.next())
                  break;
            }
      } finally {
            if (itr != null) {
                itr.close();
            }
      }
      return highestZxid;
    }
```
  启动NIOServerCnxnFactory的start()
```
    public void start() {
      // ensure thread is started once and only once
      if (thread.getState() == Thread.State.NEW) {
            thread.start();
      }
    }
```
这里的thread其实就是NIOServerCnxnFactory
```
thread = new Thread(this, "NIOServerCxn.Factory:" + addr);
thread.setDaemon(true);
```
然后来到run()循环
```
    public void run() {
      while (!ss.socket().isClosed()) {
            try {
                selector.select(1000);
                Set<SelectionKey> selected;
                synchronized (this) {
                  selected = selector.selectedKeys();
                }
                ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
                        selected);
                Collections.shuffle(selectedList);
                for (SelectionKey k : selectedList) {
                  if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                        SocketChannel sc = ((ServerSocketChannel) k
                              .channel()).accept();
                        InetAddress ia = sc.socket().getInetAddress();
                        int cnxncount = getClientCnxnCount(ia);
                        if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                            LOG.warn("Too many connections from " + ia
                                     + " - max is " + maxClientCnxns );
                            sc.close();
                        } else {
                            LOG.info("Accepted socket connection from "
                                     + sc.socket().getRemoteSocketAddress());
                            sc.configureBlocking(false);
                            SelectionKey sk = sc.register(selector,
                                    SelectionKey.OP_READ);
                            NIOServerCnxn cnxn = createConnection(sc, sk);
                            sk.attach(cnxn);
                            addCnxn(cnxn);
                        }
                  } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                        c.doIO(k);
                  } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Unexpected ops in select "
                                    + k.readyOps());
                        }
                  }
                }
                selected.clear();
            } catch (RuntimeException e) {
                LOG.warn("Ignoring unexpected runtime exception", e);
            } catch (Exception e) {
                LOG.warn("Ignoring exception", e);
            }
      }
      closeAll();
      LOG.info("NIOServerCnxn factory exited run method");
    }
```
监听IO,阻塞等待数据到来(1s)
```
selector.select(1000);
                Set<SelectionKey> selected;
                synchronized (this) {
                  selected = selector.selectedKeys();
                }
                ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
                        selected);
                Collections.shuffle(selectedList);
```
如果是连接,则accept,生成则在selector中注册SelectionKey.OP_READ事件 ,在没有达到maxClientCnxns时会创建一个连接createConnection,保存在set中
```
      LOG.info("Accepted socket connection from "
                                     + sc.socket().getRemoteSocketAddress());
                            sc.configureBlocking(false);
                            SelectionKey sk = sc.register(selector,
                                    SelectionKey.OP_READ);
                            NIOServerCnxn cnxn = createConnection(sc, sk);
                            sk.attach(cnxn);
                            addCnxn(cnxn);
```
处理OP_READ,OP_WRITE 读写时间
```
if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                        c.doIO(k);
                  }
```
```
/**
   * Handles read/write IO on connection.
   */
    void doIO(SelectionKey k) throws InterruptedException {
      try {
            if (isSocketOpen() == false) {
                LOG.warn("trying to do i/o on a null socket for session:0x"
                         + Long.toHexString(sessionId));
  return;
            }
            if (k.isReadable()) {
                int rc = sock.read(incomingBuffer);
                if (rc < 0) {
                  throw new EndOfStreamException(
                            "Unable to read additional data from client sessionid 0x"
                            + Long.toHexString(sessionId)
                            + ", likely client has closed socket");
                }
                if (incomingBuffer.remaining() == 0) {
                  boolean isPayload;
                  if (incomingBuffer == lenBuffer) { // start of next request
                        incomingBuffer.flip();
                        isPayload = readLength(k);
                        incomingBuffer.clear();
                  } else {
                        // continuation
                        isPayload = true;
                  }
                  if (isPayload) { // not the case for 4letterword
                        readPayload();
                  }
                  else {
                        // four letter words take care
                        // need not do anything else
                        return;
                  }
                }
            }
            if (k.isWritable()) {
                // ZooLog.logTraceMessage(LOG,
                // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK
                // "outgoingBuffers.size() = " +
                // outgoingBuffers.size());
                if (outgoingBuffers.size() > 0) {
                  // ZooLog.logTraceMessage(LOG,
                  // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
                  // "sk " + k + " is valid: " +
                  // k.isValid());
  /*
                     * This is going to reset the buffer position to 0 and the
                     * limit to the size of the buffer, so that we can fill it
                     * with data from the non-direct buffers that we need to
                     * send.
                     */
                  ByteBuffer directBuffer = factory.directBuffer;
                  directBuffer.clear();
  for (ByteBuffer b : outgoingBuffers) {
                        if (directBuffer.remaining() < b.remaining()) {
                            /*
                           * When we call put later, if the directBuffer is to
                           * small to hold everything, nothing will be copied,
                           * so we've got to slice the buffer if it's too big.
                           */
                            b = (ByteBuffer) b.slice().limit(
                                    directBuffer.remaining());
                        }
                        /*
                         * put() is going to modify the positions of both
                         * buffers, put we don't want to change the position of
                         * the source buffers (we'll do that after the send, if
                         * needed), so we save and reset the position after the
                         * copy
                         */
                        int p = b.position();
                        directBuffer.put(b);
                        b.position(p);
                        if (directBuffer.remaining() == 0) {
                            break;
                        }
                  }
                  /*
                     * Do the flip: limit becomes position, position gets set to
                     * 0. This sets us up for the write.
                     */
                  directBuffer.flip();
  int sent = sock.write(directBuffer);
                  ByteBuffer bb;
  // Remove the buffers that we have sent
                  while (outgoingBuffers.size() > 0) {
                        bb = outgoingBuffers.peek();
                        if (bb == ServerCnxnFactory.closeConn) {
                            throw new CloseRequestException("close requested");
                        }
                        int left = bb.remaining() - sent;
                        if (left > 0) {
                            /*
                           * We only partially sent this buffer, so we update
                           * the position and exit the loop.
                           */
                            bb.position(bb.position() + sent);
                            break;
                        }
                        packetSent();
                        /* We've sent the whole buffer, so drop the buffer */
                        sent -= bb.remaining();
                        outgoingBuffers.remove();
                  }
                  // ZooLog.logTraceMessage(LOG,
                  // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
                  // outgoingBuffers.size() = " + outgoingBuffers.size());
                }
  synchronized(this.factory){
                  if (outgoingBuffers.size() == 0) {
                        if (!initialized
                              && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
                            throw new CloseRequestException("responded to info probe");
                        }
                        sk.interestOps(sk.interestOps()
                              & (~SelectionKey.OP_WRITE));
                  } else {
                        sk.interestOps(sk.interestOps()
                              | SelectionKey.OP_WRITE);
                  }
                }
            }
      } catch (CancelledKeyException e) {
            LOG.warn("Exception causing close of session 0x"
                  + Long.toHexString(sessionId)
                  + " due to " + e);
            if (LOG.isDebugEnabled()) {
                LOG.debug("CancelledKeyException stack trace", e);
            }
            close();
      } catch (CloseRequestException e) {
            // expecting close to log session closure
            close();
      } catch (EndOfStreamException e) {
            LOG.warn("caught end of stream exception",e); // tell user why
  // expecting close to log session closure
            close();
      } catch (IOException e) {
            LOG.warn("Exception causing close of session 0x"
                  + Long.toHexString(sessionId)
                  + " due to " + e);
            if (LOG.isDebugEnabled()) {
                LOG.debug("IOException stack trace", e);
            }
            close();
      }
    }
```
页: [1]
查看完整版本: zookeeper启动过程-粗略梳理