踏雪寻梅 发表于 2018-11-30 11:23:24

Tomcat6.0连接器源码分析

  首先看BIO模式。

Server.conf配置连接器如下:



protocol设定为"HTTP/1.1",这里指org.apache.coyote.http11.Http11Protocol,

相应的转换代码在Connector类里:




[*]public void setProtocol(String protocol) {
[*]      if (AprLifecycleListener.isAprAvailable()) {
[*]            if ("HTTP/1.1".equals(protocol)) {
[*]                setProtocolHandlerClassName
[*]                  ("org.apache.coyote.http11.Http11AprProtocol");
[*]            } else if ("AJP/1.3".equals(protocol)) {
[*]                setProtocolHandlerClassName
[*]                  ("org.apache.coyote.ajp.AjpAprProtocol");
[*]            } else if (protocol != null) {
[*]                setProtocolHandlerClassName(protocol);
[*]            } else {
[*]                setProtocolHandlerClassName
[*]                  ("org.apache.coyote.http11.Http11AprProtocol");
[*]            }
[*]      } else {
[*]            if ("HTTP/1.1".equals(protocol)) {
[*]                setProtocolHandlerClassName
[*]                  ("org.apache.coyote.http11.Http11Protocol");
[*]            } else if ("AJP/1.3".equals(protocol)) {
[*]                setProtocolHandlerClassName
[*]                  ("org.apache.jk.server.JkCoyoteHandler");
[*]            } else if (protocol != null) {
[*]                setProtocolHandlerClassName(protocol);
[*]            }
[*]      }
[*]}

我们这里没有apr 也没有ajp.所以ProtocolHandlerClassName就是org.apache.coyote.http11.Http11Protocol,Http11Protocol在init里,会初始化JioEndpoint。

以后的工作主要由JioEndpoint来处理请求连接,来看看JioEndpoint的init方法:






[*]public void init()
[*]       throws Exception {
[*]
[*]       if (initialized)
[*]         return;
[*]         
[*]       // Initialize thread count defaults for acceptor
[*]       if (acceptorThreadCount == 0) {
[*]         acceptorThreadCount = 1;
[*]       }
[*]       if (serverSocketFactory == null) {
[*]         serverSocketFactory = ServerSocketFactory.getDefault();
[*]       }
[*]       if (serverSocket == null) {
[*]         try {
[*]               if (address == null) {
[*]                   serverSocket = serverSocketFactory.createSocket(port, backlog);
[*]               } else {
[*]                   serverSocket = serverSocketFactory.createSocket(port, backlog, address);
[*]               }
[*]         } catch (BindException orig) {
[*]               String msg;
[*]               if (address == null)
[*]                   msg = orig.getMessage() + " :" + port;
[*]               else
[*]                   msg = orig.getMessage() + " " +
[*]                           address.toString() + ":" + port;
[*]               BindException be = new BindException(msg);
[*]               be.initCause(orig);
[*]               throw be;
[*]         }
[*]       }
[*]       //if( serverTimeout >= 0 )
[*]       //    serverSocket.setSoTimeout( serverTimeout );
[*]         
[*]       initialized = true;
[*]         

主要目的就是创建ServerSocket. 有了服务端的listener.



Http11Protocol启动的时候,相应的启动JioEndpoint.



JioEndpoint 的start方法:




[*]public void start()
[*]      throws Exception {
[*]      // Initialize socket if not done before
[*]      if (!initialized) {
[*]         init();
[*]      }
[*]      if (!running) {
[*]            running = true;
[*]            paused = false;
[*]   
[*]            // Create worker collection
[*]            if (executor == null) {//目前executor都为空,非空的下一节会讨论
[*]                workers = new WorkerStack(maxThreads);//①.创建工作线程。
[*]            }
[*]   
[*]            // Start acceptor threads
[*]            for (int i = 0; i < acceptorThreadCount; i++) {
[*]                Thread acceptorThread = new Thread(new Acceptor(), getName() + &quot;-Acceptor-&quot; + i);
[*]                acceptorThread.setPriority(threadPriority);
[*]                acceptorThread.setDaemon(daemon);
[*]                acceptorThread.start(); // 2.启动接收线程.
[*]            }
[*]      }
[*]    }

在①处,WorkerStack模拟一个栈,里面用数组存储工作线程(Tomcat这帮人就喜欢用数组)。用来处理请求过来的socket.

在2处,启动一个接收线程,接收请求连接。



Acceptor代码如下:




[*] /**
[*]   * Server socket acceptor thread.
[*]   */
[*]    protected class Acceptor implements Runnable {
[*]      /**
[*]         * The background thread that listens for incoming TCP/IP connections and
[*]         * hands them off to an appropriate processor.
[*]         */
[*]      public void run() {
[*]   
[*]            // Loop until we receive a shutdown command
[*]            while (running) {
[*]   
[*]                // Loop if endpoint is paused
[*]                while (paused) {
[*]                  try {
[*]                        Thread.sleep(1000);
[*]                  } catch (InterruptedException e) {
[*]                        // Ignore
[*]                  }
[*]                }
[*]   
[*]                // Accept the next incoming connection from the server socket
[*]                try {
[*]                  Socket socket = serverSocketFactory.acceptSocket(serverSocket);
[*]                  serverSocketFactory.initSocket(socket);
[*]                  // Hand this socket off to an appropriate processor
[*]                  if (!processSocket(socket)) {
[*]                        // Close socket right away
[*]                        try {
[*]                            socket.close();
[*]                        } catch (IOException e) {
[*]                            // Ignore
[*]                        }
[*]                  }
[*]                }catch ( IOException x ) {
[*]                  if ( running ) log.error(sm.getString(&quot;endpoint.accept.fail&quot;), x);
[*]                } catch (Throwable t) {
[*]                  log.error(sm.getString(&quot;endpoint.accept.fail&quot;), t);
[*]                }
[*]   
[*]                // The processor will recycle itself when it finishes
[*]   
[*]            }
[*]   
[*]      }
[*]    }
[*]serverSocketFactory.acceptSocket用init方法里创建的severSocket accept一个连接Socket。然后processSocket(socket).
[*]下面看processSocke(socket)方法:
[*]   
[*]protected boolean processSocket(Socket socket) {
[*]      try {
[*]            if (executor == null) { //目前executor都为空。
[*]                getWorkerThread().assign(socket);
[*]            } else {
[*]                executor.execute(new SocketProcessor(socket));
[*]            }
[*]      } catch (Throwable t) {
[*]            // This means we got an OOM or similar creating a thread, or that
[*]            // the pool and its queue are full
[*]            log.error(sm.getString(&quot;endpoint.process.fail&quot;), t);
[*]            return false;
[*]      }
[*]      return true;
[*]    }

getWorkerThread()方法是从刚才创建的工作线程栈WorkerStack中取得一个工作线程。

这段代码很简单,就不说了,有兴趣看一下Tomcat的源代码(Class:JioEndpoint).



我们看一下工作线程类Worker吗。


[*]protected class Worker implements Runnable {
[*]   
[*]      protected Thread thread = null;
[*]      protected boolean available = false;
[*]      protected Socket socket = null;
[*]   
[*]      /**
[*]         * Process an incoming TCP/IP connection on the specified socket. Any
[*]         * exception that occurs during processing must be logged and swallowed.
[*]         * NOTE: This method is called from our Connector's thread. We
[*]         * must assign it to our own thread so that multiple simultaneous
[*]         * requests can be handled.
[*]         *
[*]         * @param socket TCP socket to process
[*]         */
[*]      synchronized void assign(Socket socket) {
[*]   
[*]            // Wait for the Processor to get the previous Socket
[*]            while (available) {
[*]                try {
[*]                  wait();
[*]                } catch (InterruptedException e) {
[*]                }
[*]            }
[*]            // Store the newly available Socket and notify our thread
[*]            this.socket = socket;
[*]            available = true;
[*]            notifyAll();
[*]   
[*]      }
[*]   
[*]         
[*]      /**
[*]         * Await a newly assigned Socket from our Connector, or null
[*]         * if we are supposed to shut down.
[*]         */
[*]       private synchronized Socket await() {
[*]            // Wait for the Connector to provide a new Socket
[*]            while (!available) {
[*]                try {
[*]                  wait();
[*]                } catch (InterruptedException e) {
[*]                }
[*]            }
[*]   
[*]            // Notify the Connector that we have received this Socket
[*]            Socket socket = this.socket;
[*]            available = false;
[*]            notifyAll();
[*]   
[*]            return (socket);
[*]   
[*]      }
[*]      /**
[*]         * The background thread that listens for incoming TCP/IP connections and
[*]         * hands them off to an appropriate processor.
[*]         */
[*]      public void run() {
[*]   
[*]            // Process requests until we receive a shutdown signal
[*]            while (running) {
[*]   
[*]                // Wait for the next socket to be assigned
[*]                Socket socket = await();
[*]                if (socket == null)
[*]                  continue;
[*]   
[*]                // Process the request from this socket
[*]                if (!setSocketOptions(socket) || !handler.process(socket)) {
[*]                  // Close socket
[*]                  try {
[*]                        socket.close();
[*]                  } catch (IOException e) {
[*]                  }
[*]                }
[*]   
[*]                // Finish up this request
[*]                socket = null;
[*]                recycleWorkerThread(this);
[*]   
[*]            }
[*]   
[*]      }
[*]       /**
[*]         * Start the background processing thread.
[*]         */
[*]      public void start() {
[*]            thread = new Thread(this);
[*]            thread.setName(getName() + &quot;-&quot; + (++curThreads));
[*]            thread.setDaemon(true);
[*]            thread.start();
[*]      }
[*]    }

首行看一下刚刚被调用的assign方法,Worker类通过available互斥。Available可理解为是否还有现成的Socket绑定在这个工作线程上,true表示有。Assign首先判断Available,如果有可用socket,即Available为true,则wait直到被唤醒。 This method is called from our Connector's thread.告诉我们该方法由连接器线程调用。那么工作线程自己呢。看run方法,调用了await,按照上面的理解,如果没有可用的socket,即Available为false,则wait直到被唤醒。如果为true,刚马上拿走这个socket.并把Available设为false.就可以有新的Socket放进来了。



但这里有点问题,从WorkerStack栈出取出的Worker或者新建的Worker,Available肯定都为false.那么assign方法的while (available)循环就没有必要了。不清楚为什么作者这么写。



获得Socket之后交由handler去处理,这里的handler就

是Http11Protocol$Http11ConnectionHandler,处理流程,以会再讨论。



页: [1]
查看完整版本: Tomcat6.0连接器源码分析