Tomcat6.0连接器源码分析
首先看BIO模式。Server.conf配置连接器如下:
protocol设定为"HTTP/1.1",这里指org.apache.coyote.http11.Http11Protocol,
相应的转换代码在Connector类里:
[*]public void setProtocol(String protocol) {
[*] if (AprLifecycleListener.isAprAvailable()) {
[*] if ("HTTP/1.1".equals(protocol)) {
[*] setProtocolHandlerClassName
[*] ("org.apache.coyote.http11.Http11AprProtocol");
[*] } else if ("AJP/1.3".equals(protocol)) {
[*] setProtocolHandlerClassName
[*] ("org.apache.coyote.ajp.AjpAprProtocol");
[*] } else if (protocol != null) {
[*] setProtocolHandlerClassName(protocol);
[*] } else {
[*] setProtocolHandlerClassName
[*] ("org.apache.coyote.http11.Http11AprProtocol");
[*] }
[*] } else {
[*] if ("HTTP/1.1".equals(protocol)) {
[*] setProtocolHandlerClassName
[*] ("org.apache.coyote.http11.Http11Protocol");
[*] } else if ("AJP/1.3".equals(protocol)) {
[*] setProtocolHandlerClassName
[*] ("org.apache.jk.server.JkCoyoteHandler");
[*] } else if (protocol != null) {
[*] setProtocolHandlerClassName(protocol);
[*] }
[*] }
[*]}
我们这里没有apr 也没有ajp.所以ProtocolHandlerClassName就是org.apache.coyote.http11.Http11Protocol,Http11Protocol在init里,会初始化JioEndpoint。
以后的工作主要由JioEndpoint来处理请求连接,来看看JioEndpoint的init方法:
[*]public void init()
[*] throws Exception {
[*]
[*] if (initialized)
[*] return;
[*]
[*] // Initialize thread count defaults for acceptor
[*] if (acceptorThreadCount == 0) {
[*] acceptorThreadCount = 1;
[*] }
[*] if (serverSocketFactory == null) {
[*] serverSocketFactory = ServerSocketFactory.getDefault();
[*] }
[*] if (serverSocket == null) {
[*] try {
[*] if (address == null) {
[*] serverSocket = serverSocketFactory.createSocket(port, backlog);
[*] } else {
[*] serverSocket = serverSocketFactory.createSocket(port, backlog, address);
[*] }
[*] } catch (BindException orig) {
[*] String msg;
[*] if (address == null)
[*] msg = orig.getMessage() + " :" + port;
[*] else
[*] msg = orig.getMessage() + " " +
[*] address.toString() + ":" + port;
[*] BindException be = new BindException(msg);
[*] be.initCause(orig);
[*] throw be;
[*] }
[*] }
[*] //if( serverTimeout >= 0 )
[*] // serverSocket.setSoTimeout( serverTimeout );
[*]
[*] initialized = true;
[*]
主要目的就是创建ServerSocket. 有了服务端的listener.
Http11Protocol启动的时候,相应的启动JioEndpoint.
JioEndpoint 的start方法:
[*]public void start()
[*] throws Exception {
[*] // Initialize socket if not done before
[*] if (!initialized) {
[*] init();
[*] }
[*] if (!running) {
[*] running = true;
[*] paused = false;
[*]
[*] // Create worker collection
[*] if (executor == null) {//目前executor都为空,非空的下一节会讨论
[*] workers = new WorkerStack(maxThreads);//①.创建工作线程。
[*] }
[*]
[*] // Start acceptor threads
[*] for (int i = 0; i < acceptorThreadCount; i++) {
[*] Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);
[*] acceptorThread.setPriority(threadPriority);
[*] acceptorThread.setDaemon(daemon);
[*] acceptorThread.start(); // 2.启动接收线程.
[*] }
[*] }
[*] }
在①处,WorkerStack模拟一个栈,里面用数组存储工作线程(Tomcat这帮人就喜欢用数组)。用来处理请求过来的socket.
在2处,启动一个接收线程,接收请求连接。
Acceptor代码如下:
[*] /**
[*] * Server socket acceptor thread.
[*] */
[*] protected class Acceptor implements Runnable {
[*] /**
[*] * The background thread that listens for incoming TCP/IP connections and
[*] * hands them off to an appropriate processor.
[*] */
[*] public void run() {
[*]
[*] // Loop until we receive a shutdown command
[*] while (running) {
[*]
[*] // Loop if endpoint is paused
[*] while (paused) {
[*] try {
[*] Thread.sleep(1000);
[*] } catch (InterruptedException e) {
[*] // Ignore
[*] }
[*] }
[*]
[*] // Accept the next incoming connection from the server socket
[*] try {
[*] Socket socket = serverSocketFactory.acceptSocket(serverSocket);
[*] serverSocketFactory.initSocket(socket);
[*] // Hand this socket off to an appropriate processor
[*] if (!processSocket(socket)) {
[*] // Close socket right away
[*] try {
[*] socket.close();
[*] } catch (IOException e) {
[*] // Ignore
[*] }
[*] }
[*] }catch ( IOException x ) {
[*] if ( running ) log.error(sm.getString("endpoint.accept.fail"), x);
[*] } catch (Throwable t) {
[*] log.error(sm.getString("endpoint.accept.fail"), t);
[*] }
[*]
[*] // The processor will recycle itself when it finishes
[*]
[*] }
[*]
[*] }
[*] }
[*]serverSocketFactory.acceptSocket用init方法里创建的severSocket accept一个连接Socket。然后processSocket(socket).
[*]下面看processSocke(socket)方法:
[*]
[*]protected boolean processSocket(Socket socket) {
[*] try {
[*] if (executor == null) { //目前executor都为空。
[*] getWorkerThread().assign(socket);
[*] } else {
[*] executor.execute(new SocketProcessor(socket));
[*] }
[*] } catch (Throwable t) {
[*] // This means we got an OOM or similar creating a thread, or that
[*] // the pool and its queue are full
[*] log.error(sm.getString("endpoint.process.fail"), t);
[*] return false;
[*] }
[*] return true;
[*] }
getWorkerThread()方法是从刚才创建的工作线程栈WorkerStack中取得一个工作线程。
这段代码很简单,就不说了,有兴趣看一下Tomcat的源代码(Class:JioEndpoint).
我们看一下工作线程类Worker吗。
[*]protected class Worker implements Runnable {
[*]
[*] protected Thread thread = null;
[*] protected boolean available = false;
[*] protected Socket socket = null;
[*]
[*] /**
[*] * Process an incoming TCP/IP connection on the specified socket. Any
[*] * exception that occurs during processing must be logged and swallowed.
[*] * NOTE: This method is called from our Connector's thread. We
[*] * must assign it to our own thread so that multiple simultaneous
[*] * requests can be handled.
[*] *
[*] * @param socket TCP socket to process
[*] */
[*] synchronized void assign(Socket socket) {
[*]
[*] // Wait for the Processor to get the previous Socket
[*] while (available) {
[*] try {
[*] wait();
[*] } catch (InterruptedException e) {
[*] }
[*] }
[*] // Store the newly available Socket and notify our thread
[*] this.socket = socket;
[*] available = true;
[*] notifyAll();
[*]
[*] }
[*]
[*]
[*] /**
[*] * Await a newly assigned Socket from our Connector, or null
[*] * if we are supposed to shut down.
[*] */
[*] private synchronized Socket await() {
[*] // Wait for the Connector to provide a new Socket
[*] while (!available) {
[*] try {
[*] wait();
[*] } catch (InterruptedException e) {
[*] }
[*] }
[*]
[*] // Notify the Connector that we have received this Socket
[*] Socket socket = this.socket;
[*] available = false;
[*] notifyAll();
[*]
[*] return (socket);
[*]
[*] }
[*] /**
[*] * The background thread that listens for incoming TCP/IP connections and
[*] * hands them off to an appropriate processor.
[*] */
[*] public void run() {
[*]
[*] // Process requests until we receive a shutdown signal
[*] while (running) {
[*]
[*] // Wait for the next socket to be assigned
[*] Socket socket = await();
[*] if (socket == null)
[*] continue;
[*]
[*] // Process the request from this socket
[*] if (!setSocketOptions(socket) || !handler.process(socket)) {
[*] // Close socket
[*] try {
[*] socket.close();
[*] } catch (IOException e) {
[*] }
[*] }
[*]
[*] // Finish up this request
[*] socket = null;
[*] recycleWorkerThread(this);
[*]
[*] }
[*]
[*] }
[*] /**
[*] * Start the background processing thread.
[*] */
[*] public void start() {
[*] thread = new Thread(this);
[*] thread.setName(getName() + "-" + (++curThreads));
[*] thread.setDaemon(true);
[*] thread.start();
[*] }
[*] }
首行看一下刚刚被调用的assign方法,Worker类通过available互斥。Available可理解为是否还有现成的Socket绑定在这个工作线程上,true表示有。Assign首先判断Available,如果有可用socket,即Available为true,则wait直到被唤醒。 This method is called from our Connector's thread.告诉我们该方法由连接器线程调用。那么工作线程自己呢。看run方法,调用了await,按照上面的理解,如果没有可用的socket,即Available为false,则wait直到被唤醒。如果为true,刚马上拿走这个socket.并把Available设为false.就可以有新的Socket放进来了。
但这里有点问题,从WorkerStack栈出取出的Worker或者新建的Worker,Available肯定都为false.那么assign方法的while (available)循环就没有必要了。不清楚为什么作者这么写。
获得Socket之后交由handler去处理,这里的handler就
是Http11Protocol$Http11ConnectionHandler,处理流程,以会再讨论。
页:
[1]