huijial 发表于 2017-1-25 10:40:31

Tomcat源码之AprEndpoint

APR,全称Apache Portable Runtime,使用native server为Tomcat提供更好的伸缩性、更高的性能以及更强的集成能力。APR还是Apache HTTP Server2.x中的核心轻量级库,它可以提供更好的IO功能(如sendfile,epoll和OpenSSL)、OS功能和本地线程管理(共享内存、NT管道和UNIX套接字)。这些都可以使Tomcat变得更通用、更易扩展。
APR运行需要以下3个组件:APR Library、JNI Wapper for APR和OpenSSL Libary。如果以上功能已经安装,Tomcat在启动时会自动使用APR Connector。
与JIoEndPoint相比,AprEndpoint使用JNI的接口来获得对Socket的访问,功能实现要更为复杂,含有的线程更多:Acceptor Thread、Asynctimeout Thread、Poller Thread、Comet Poller Thread和Sendfile Thread。

Acceptor Thread、Asynctimeout Thread
功能与JIoEndpoint中的类似,同样是完成侦听和监控request的任务,不同的是这两个class直接继承Thread而不是实现Runnable接口。最终负责处理Socket的地方是在AjpConnectionHandler中,对应的处理线程(池)是Exector。
   
    protected class Acceptor extends Thread {
.....
    @Override
      public void run() {
            int errorDelay = 0;
            // Loop until we receive a shutdown command
            while (running) {
                ....
                try {
                  //if we have reached max connections, wait
                  awaitConnection();
                  
                  long socket = 0;
                  try {
                        // Accept the next incoming connection from the server
                        // socket
                        socket = Socket.accept(serverSock);
                  } catch (Exception e) {
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw e;
                  }
                  // Successful accept, reset the error delay
                  errorDelay = 0;
                  //increment socket count
                  countUpConnection();
                  .....
                } catch (Throwable t) {
                  ....
                }
                }
                // The processor will recycle itself when it finishes
            }
      }
    }

   
    protected class AsyncTimeout implements Runnable {
      
      @Override
      public void run() {
            // Loop until we receive a shutdown command
            while (running) {
                ....
            long now = System.currentTimeMillis();
                Iterator<SocketWrapper<Long>> sockets =
                  waitingRequests.iterator();
                while (sockets.hasNext()) {
                  SocketWrapper<Long> socket = sockets.next();
                  if (socket.async) {
                        long access = socket.getLastAccess();
                        if ((now-access)>socket.getTimeout()) {
                            processSocketAsync(socket,SocketStatus.TIMEOUT);
                        }
                  }
                }
             ....                }
            }
      }
    }

Poller Thread
Poller类主要负责poll传入的socket连接(提供add(),由其他线程传入),如果发现有事件,交给AjpConnectionHandler处理。
    public class Poller extends Thread {
      ............
      
      public void add(long socket) {
            synchronized (this) {
                // Add socket to the list. Newly added sockets will wait
                // at most for pollTime before being polled
                if (addCount >= addS.length) {
                  // Can't do anything: close the socket right away
                  if (comet) {
                        processSocket(socket, SocketStatus.ERROR);
                  } else {
                        destroySocket(socket);
                  }
                  return;
                }
                addS = socket;
                addCount++;
                this.notify();
            }
      }
      
      @Override
      public void run() {
         
                try {
                  // Add sockets which are waiting to the poller
                  if (addCount > 0) {
                        synchronized (this) {
                            int successCount = 0;
                            try {
                              for (int i = (addCount - 1); i >= 0; i--) {
                                    int rv = Poll.add(serverPollset, addS, Poll.APR_POLLIN);
                                    if (rv == Status.APR_SUCCESS) {
                                        successCount++;
                                    } else {
                                        // Can't do anything: close the socket right away
                                        if (comet) {
                                          processSocket(addS, SocketStatus.ERROR);
                                        } else {
                                          destroySocket(addS);
                                        }
                                    }
                              }
                            } finally {
                              keepAliveCount += successCount;
                              addCount = 0;
                            }
                        }
                  }
                  maintainTime += pollTime;
                  // Pool for the specified interval
                  int rv = Poll.poll(serverPollset, pollTime, desc, true);
                  if (rv > 0) {
                        keepAliveCount -= rv;
                        for (int n = 0; n < rv; n++) {
                            // Check for failed sockets and hand this socket off to a worker
                            if (((desc & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
                                    || ((desc & Poll.APR_POLLERR) == Poll.APR_POLLERR)
                                    || (comet && (!processSocket(desc, SocketStatus.OPEN)))
                                    || (!comet && (!processSocket(desc)))) {
                              // Close socket and clear pool
                              if (comet) {
                                    processSocket(desc, SocketStatus.DISCONNECT);
                              } else {
                                    destroySocket(desc);
                              }
                              continue;
                            }
                        }
                  }
            ..............
            } catch (Throwable t) {
                  ExceptionUtils.handleThrowable(t);
                  log.error(sm.getString("endpoint.poll.error"), t);
                }
            }
            synchronized (this) {
                this.notifyAll();
            }
      }
    }

Sendfile Thread
用于实现sendfile功能,它的工作流程跟Poller类似,add()中,将文件写入socket中。在run()中用Poll来轮询socket的状态,处理文件写入.
代码略....

初始化和启动
在bind()中完成对socket/SSL(如果要支持的话)等资源的初始化,在startInternal(),启动Poller线程组, Comet Poller线程组(两个都是Poller类),Sendfile线程组,Acceptor线程组和1个运行AsyncTimeout接口的线程组.
页: [1]
查看完整版本: Tomcat源码之AprEndpoint