tomcat源码分析系列之请求处理---请君入瓮
费了九牛二虎之力,tomcat终于启动起来了,接下来我们要看看tomcat是如何处理一个请求的。上回说到JIoEndpoint是负责处理请求的,这个类里面有几个内部接口和内部类,其中有一个就是我们要关注的Acceptor
// --------------------------------------------------- Acceptor Inner Class
/**
* Server socket acceptor thread.
*/
protected class Acceptor implements Runnable {
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
@Override
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused && running) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) {
break;
}
try {
//超过最大连接数,就等待
//if we have reached max connections, wait
countUpOrAwaitConnection();
Socket socket = null;
try {
// Accept the next incoming connection from the server
// socket
socket = serverSocketFactory.acceptSocket(serverSocket);
} catch (IOException ioe) {
/**
* 触发一种延迟处理,如果errorDelay>0,线程会sleep相应的毫秒,主要是为后面的程序处理异常
* 争取时间,从而避免在短时间大量占用CPU
* 每多一个异常,会延迟50ms,直到达到最大阀值1.6s
**/
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (setSocketOptions(socket)) {
// Hand this socket off to an appropriate processor
if (!processSocket(socket)) {
// Close socket right away
try {
socket.close();
} catch (IOException e) {
// Ignore
}
}
} else {
// Close socket right away
try {
socket.close();
} catch (IOException e) {
// Ignore
}
}
} catch (IOException x) {
if (running) {
log.error(sm.getString("endpoint.accept.fail"), x);
}
} catch (NullPointerException npe) {
if (running) {
log.error(sm.getString("endpoint.accept.fail"), npe);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
// The processor will recycle itself when it finishes
}
}
}
前面说过,JIoEndpoint启动了多个Acceptor来处理请求,每个Acceptor都是一个线程,Acceptor的run方法里主要就是调用processSocket(socket),这是JIoEndpoint里的方法,进入这个方法看一下
/**
* Process a new connection from a new client. Wraps the socket so
* keep-alive and other attributes can be tracked and then passes the socket
* to the executor for processing.
*
* @param socket The socket associated with the client.
*
* @return <code>true</code> if the socket is passed to the
* executor, <code>false</code> if something went wrong or
* if the endpoint is shutting down. Returning
* <code>false</code> is an indication to close the socket
* immediately.
*/
protected boolean processSocket(Socket socket) {
// Process the request from this socket
try {
SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
// During shutdown, executor may be null - avoid NPE
if (!running) {
return false;
}
getExecutor().execute(new SocketProcessor(wrapper));
} catch (RejectedExecutionException x) {
log.warn("Socket processing request was rejected for:"+socket,x);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
方法的注释已经说的很明白了,它就是将socket包装了一下,以便扩展传递一些扩展属性,接下来调用 getExecutor().execute(new SocketProcessor(wrapper));getExecutor()在前面已经提到过,它是一个线程池对象,里面存放了一个个将要被执行的请求线程。这个SocketProcessor是JIoEndpoint里的另一个内部类,也是一个多线程:
// ------------------------------------------- SocketProcessor Inner Class
/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
protected class SocketProcessor implements Runnable {
protected SocketWrapper<Socket> socket = null;
protected SocketStatus status = null;
public SocketProcessor(SocketWrapper<Socket> socket) {
if (socket==null) throw new NullPointerException();
this.socket = socket;
}
public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus status) {
this(socket);
this.status = status;
}
@Override
public void run() {
boolean launch = false;
synchronized (socket) {
try {
SocketState state = SocketState.OPEN;
try {
//首先对ssl请求做特殊处理,如果不是ssl请求,这个方法什么也不做
// SSL handshake
serverSocketFactory.handshake(socket.getSocket());
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.handshake"), t);
}
// Tell to close the socket
state = SocketState.CLOSED;
}
//如果不是ssl请求
if ((state != SocketState.CLOSED)) {
//处理请求
if (status == null) {
state = handler.process(socket, SocketStatus.OPEN);
} else {
state = handler.process(socket,status);
}
}
if (state == SocketState.CLOSED) {
// Close socket
if (log.isTraceEnabled()) {
log.trace("Closing socket:"+socket);
}
countDownConnection();//处理连接数
try {
socket.getSocket().close();//关闭socket连接
} catch (IOException e) {
// Ignore
}
} else if (state == SocketState.OPEN){
socket.setKeptAlive(true);
socket.access();
launch = true;
} else if (state == SocketState.LONG) {
socket.access();
waitingRequests.add(socket);
}
} finally {
if (launch) {
try {
getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN));
} catch (NullPointerException npe) {
if (running) {
log.error(sm.getString("endpoint.launch.fail"),
npe);
}
}
}
}
}
socket = null;
// Finish up this request
}
}
我们看一下实际请求的处理state = handler.process(socket, SocketStatus.OPEN);这个handler是什么东西?具体的协议对象,在这里是Http11Protocol对象实例。我们先看一下Http11Protocol的类关系图:
process方法在AbstractProtocol类的静态内部类AbstractConnectionHandler里:
public SocketState process(SocketWrapper<S> socket,
SocketStatus status) {
/**将socket从connections Map里移除,这只针对SocketState.LONG的情况有效**/
P processor = connections.remove(socket.getSocket());
socket.setAsync(false);
try {
if (processor == null) {
processor = recycledProcessors.poll();//从池里取得一个processor
}
if (processor == null) {
processor = createProcessor();//没有就新建一个processor
}
initSsl(socket, processor);
SocketState state = SocketState.CLOSED;
do {
if (processor.isAsync() || state == SocketState.ASYNC_END) {
state = processor.asyncDispatch(status);
} else if (processor.isComet()) {//如果是comet
state = processor.event(status);//tomcat不支持comet,所以此方法只是简单的抛出异常
} else {//普通的处理
state = processor.process(socket);
}
if (state != SocketState.CLOSED && processor.isAsync()) {
state = processor.asyncPostProcess();
}
} while (state == SocketState.ASYNC_END);
if (state == SocketState.LONG) {
// In the middle of processing a request/response. Keep the
// socket associated with the processor. Exact requirements
// depend on type of long poll
longPoll(socket, processor);
} else if (state == SocketState.OPEN){
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
release(socket, processor, false, true);//释放processor
} else {
// Connection closed. OK to recycle the processor.
release(socket, processor, true, false);//释放processor
}
return state;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
getLog().debug(sm.getString(
"ajpprotocol.proto.socketexception.debug"), e);
} catch (java.io.IOException e) {
// IOExceptions are normal
getLog().debug(sm.getString(
"ajpprotocol.proto.ioexception.debug"), e);
}
// Future developers: if you discover any other
// rare-but-nonfatal exceptions, catch them here, and log as
// above.
catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
// any other exception or error is odd. Here we log it
// with "ERROR" level, so it will show up even on
// less-than-verbose logs.
getLog().error(sm.getString("ajpprotocol.proto.error"), e);
}
release(socket, processor, true, false);
return SocketState.CLOSED;
}
先说下relese(socket,processor,true,false)方法,这个方法虽然带了4个参数,但目前实际使用的只有前2个参数,后2个直接没管了,不知道是留着后续扩展的目的?此方法在Http11Protocol类的内部类Http11ConnectionHandler
/**
* Expected to be used by the handler once the processor is no longer
* required.
*
* @param socket Not used in BIO
* @param processor
* @param isSocketClosing Not used in HTTP
* @param addToPoller Not used in BIO
*/
@Override
public void release(SocketWrapper<Socket> socket,
Http11Processor processor, boolean isSocketClosing,
boolean addToPoller) {
processor.recycle();
recycledProcessors.offer(processor);
}
回去看processor吧,首先我们要知道我们的processor是哪个对象:
@Override
protected Http11Processor createProcessor() {
Http11Processor processor = new Http11Processor(
proto.getMaxHttpHeaderSize(), (JIoEndpoint)proto.endpoint,
proto.getMaxTrailerSize());
processor.setAdapter(proto.adapter);
processor.setMaxKeepAliveRequests(proto.getMaxKeepAliveRequests());
processor.setKeepAliveTimeout(proto.getKeepAliveTimeout());
processor.setConnectionUploadTimeout(
proto.getConnectionUploadTimeout());
processor.setDisableUploadTimeout(proto.getDisableUploadTimeout());
processor.setCompressionMinSize(proto.getCompressionMinSize());
processor.setCompression(proto.getCompression());
processor.setNoCompressionUserAgents(proto.getNoCompressionUserAgents());
processor.setCompressableMimeTypes(proto.getCompressableMimeTypes());
processor.setRestrictedUserAgents(proto.getRestrictedUserAgents());
processor.setSocketBuffer(proto.getSocketBuffer());
processor.setMaxSavePostSize(proto.getMaxSavePostSize());
processor.setServer(proto.getServer());
processor.setDisableKeepAlivePercentage(
proto.getDisableKeepAlivePercentage());
register(processor);
return processor;
}
Http11Processor与Http11Protocol有类似类关系图的Http11Processor--->AbstractHttp11Processor-->AbstractProcessor-->Processor
AbstractProcessor虽然是个抽象类,但它有自己的构造函数
public AbstractProcessor(AbstractEndpoint endpoint) {
this.endpoint = endpoint;
asyncStateMachine = new AsyncStateMachine(this);
request = new Request();
response = new Response();
response.setHook(this);
request.setResponse(response);
}
之所以提到这个构造函数,因为它生成了request和response对象,后面我们需要围绕这两个对象团团转,还是在此先供奉一下吧,把它老人家的底细弄清楚了,后面才不至于被这两活宝糊弄。
我们回到processor.process(socket);的处理,这个方法所作的事情就是解析http请求数据,组装request和response对象,最后将response输出,详细的细节,我们下节讨论。到目前为止我们已经把请求请到家里来了,并且我们知道后面谁会去处理这些东西,后面接下来要解决的就是,如何去解析、处理这个请求了,欲知后事如何,且听下回分解。
页:
[1]