|
上文说到在JIoEndpoint类中处理请求最终是调用到内部接口Handler的process()方法,而Handler的实现类是Http11Protocol的内部类Http11ConnectionHandler,Http11ConnectionHandler又委托Http11Processor进行处理,tomcat在这个地方我觉得类的设计不是很优雅啊,内部类用的太多,导致阅读的时候有困难(好处是内部类持有外部类的的引用,不用使用显示的new)。
先把Http11ConnectionHandler的类附上:
protected static class Http11ConnectionHandler implements Handler {
protected Http11Protocol proto;
protected AtomicLong registerCount = new AtomicLong(0);
protected RequestGroupInfo global = new RequestGroupInfo();
//无界线程安全队列,不会阻塞,自己加工的原因是processorCache的数量是有限的,为了线程和性能考虑
protected ConcurrentLinkedQueue<Http11Processor> recycledProcessors =
new ConcurrentLinkedQueue<Http11Processor>() {
protected AtomicInteger size = new AtomicInteger(0);
public boolean offer(Http11Processor processor) {
boolean offer = (proto.processorCache == -1) ? true : (size.get() < proto.processorCache);
//avoid over growing our cache or add after we have stopped
boolean result = false;
if ( offer ) {
result = super.offer(processor);
if ( result ) {
size.incrementAndGet();
}
}
if (!result) unregister(processor);
return result;
}
public Http11Processor poll() {
Http11Processor result = super.poll();
if ( result != null ) {
size.decrementAndGet();
}
return result;
}
public void clear() {
Http11Processor next = poll();
while ( next != null ) {
unregister(next);
next = poll();
}
super.clear();
size.set(0);
}
};
Http11ConnectionHandler(Http11Protocol proto) {
this.proto = proto;
}
public boolean process(Socket socket) {
//processorCache里面没有processor
Http11Processor processor = recycledProcessors.poll();
try {
if (processor == null) {
processor = createProcessor();
}
if (processor instanceof ActionHook) {
((ActionHook) processor).action(ActionCode.ACTION_START, null);
}
if (proto.isSSLEnabled() && (proto.sslImplementation != null)) {
processor.setSSLSupport
(proto.sslImplementation.getSSLSupport(socket));
} else {
processor.setSSLSupport(null);
}
processor.process(socket);
return false;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
Http11Protocol.log.debug
(sm.getString
("http11protocol.proto.socketexception.debug"), e);
} catch (java.io.IOException e) {
// IOExceptions are normal
Http11Protocol.log.debug
(sm.getString
("http11protocol.proto.ioexception.debug"), e);
}
// Future developers: if you discover any other
// rare-but-nonfatal exceptions, catch them here, and log as
// above.
catch (Throwable e) {
// any other exception or error is odd. Here we log it
// with "ERROR" level, so it will show up even on
// less-than-verbose logs.
Http11Protocol.log.error
(sm.getString("http11protocol.proto.error"), e);
} finally {
// if(proto.adapter != null) proto.adapter.recycle();
// processor.recycle();
if (processor instanceof ActionHook) {
((ActionHook) processor).action(ActionCode.ACTION_STOP, null);
}
recycledProcessors.offer(processor);
}
return false;
}
}
看process()方法前,我们看另外一个有意思的东西,ConcurrentLinkedQueue的子类,自己实现的原因是ConcurrentLinkedQueue是无界的,非阻塞的,而这个需要的是有界的,查了下jdk的concurrent包,没有现有的有界的,非阻塞的实现类,所有这里自己实现个,还有实现这个主要还和jmx的注册联系在一起,这里由于processorCache==-1,长度限制这个原因不存在。主要看process()这个方法
首先,从ConcurrentLinkedQueue中取得一个Http11Processor,没有的话就创建一个。
由于Http11Processor是ActionHook的实现,ActionHook的action方法被执行,这个以后再说,这里是 ActionCode.ACTION_START的事件的执行
接着是SSL的一些的设置,在https的时候有用
接着重点,调用Http11Processor的process的方法
后面就是收拾残局了,什么错误啊,Http11Processor的回收了,ACTION_STOP的事件执行了
转到Http11Processor的process()的方法:
public void process(Socket theSocket)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
// Set the remote address
remoteAddr = null;
remoteHost = null;
localAddr = null;
localName = null;
remotePort = -1;
localPort = -1;
// Setting up the I/O
this.socket = theSocket;
inputBuffer.setInputStream(socket.getInputStream());
outputBuffer.setOutputStream(socket.getOutputStream());
// Error flag
error = false;
keepAlive = true;
int keepAliveLeft = maxKeepAliveRequests;
int soTimeout = endpoint.getSoTimeout();
// When using an executor, these values may return non-positive values
int curThreads = endpoint.getCurrentThreadsBusy();
int maxThreads = endpoint.getMaxThreads();
if (curThreads > 0 && maxThreads > 0) {
// Only auto-disable keep-alive if the current thread usage % can be
// calculated correctly
if ((curThreads*100)/maxThreads > 75) {
keepAliveLeft = 1;
}
}
try {
socket.setSoTimeout(soTimeout);
} catch (Throwable t) {
log.debug(sm.getString("http11processor.socket.timeout"), t);
error = true;
}
boolean keptAlive = false;
//keepAlive:当处理完用户发起的 HTTP 请求后是否立即关闭 TCP 连接
while (started && !error && keepAlive) {
// Parsing the request header
try {
if (keptAlive) {
if (keepAliveTimeout > 0) {
socket.setSoTimeout(keepAliveTimeout);
}
else if (soTimeout > 0) {
socket.setSoTimeout(soTimeout);
}
}
//解析http header 的第一行数据
inputBuffer.parseRequestLine();
request.setStartTime(System.currentTimeMillis());
keptAlive = true;
if (disableUploadTimeout) {
socket.setSoTimeout(soTimeout);
} else {
socket.setSoTimeout(timeout);
}
//解析http header 的除第一行数据以外的数据
inputBuffer.parseHeaders();
} catch (IOException e) {
error = true;
break;
} catch (Throwable t) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.header.parse"), t);
}
// 400 - Bad Request
response.setStatus(400);
error = true;
}
if (!error) {
// Setting up filters, and parse some request headers
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {
prepareRequest();
} catch (Throwable t) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.request.prepare"), t);
}
// 400 - Internal Server Error
response.setStatus(400);
error = true;
}
}
if (maxKeepAliveRequests > 0 && --keepAliveLeft == 0)
keepAlive = false;
// Process the request in the adapter
if (!error) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
adapter.service(request, response);
// Handle when the response was committed before a serious
// error occurred. Throwing a ServletException should both
// set the status to 500 and set the errorException.
// If we fail here, then the response is likely already
// committed, so we can't try and set headers.
if(keepAlive && !error) { // Avoid checking twice.
error = response.getErrorException() != null ||
statusDropsConnection(response.getStatus());
}
} catch (InterruptedIOException e) {
error = true;
} catch (Throwable t) {
log.error(sm.getString("http11processor.request.process"), t);
// 500 - Internal Server Error
response.setStatus(500);
error = true;
}
}
// Finish the handling of the request
try {
rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
// If we know we are closing the connection, don't drain input.
// This way uploading a 100GB file doesn't tie up the thread
// if the servlet has rejected it.
if(error)
inputBuffer.setSwallowInput(false);
inputBuffer.endRequest();
} catch (IOException e) {
error = true;
} catch (Throwable t) {
log.error(sm.getString("http11processor.request.finish"), t);
// 500 - Internal Server Error
response.setStatus(500);
error = true;
}
try {
rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
outputBuffer.endRequest();
} catch (IOException e) {
error = true;
} catch (Throwable t) {
log.error(sm.getString("http11processor.response.finish"), t);
error = true;
}
// If there was an error, make sure the request is counted as
// and error, and update the statistics counter
if (error) {
response.setStatus(500);
}
request.updateCounters();
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
// Don't reset the param - we'll see it as ended. Next request
// will reset it
// thrA.setParam(null);
// Next request
inputBuffer.nextRequest();
outputBuffer.nextRequest();
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
// Recycle
inputBuffer.recycle();
outputBuffer.recycle();
this.socket = null;
// Recycle ssl info
sslSupport = null;
}
Http11Processor很夸张的又引用到JIoEndpoint(感觉很乱,是否有循环有用的感觉?)
这里有一个很有意思的变量keepAliveLeft,一开始的时候等于maxKeepAliveRequests,也就是创建Processor的时候传入的Http11Protocol的maxKeepAliveRequests,默认为100,当JIoEndpoint的curThreads(当前已经用的线程数)/maxThreads>0.75,maxKeepAliveRequests马上可怜的变成1,这里可以认为是过载保护。
当开始标志,没有错误标志等设置后,就开始真正解析这个传入的socket了.
在这之前实例化InternalInputBuffer,InternalOutputBuffer两个类,对应socket的输入和输出,是可以重复使用的高性能的类。
首先调用InternalInputBuffer的parseRequestLine()方法解析http的请求行,即'请求方法 请求uri http版本'那一行
接着解析请求头InternalInputBuffer的parseHeaders()方法干这个活
public void parseHeaders()
throws IOException {
while (parseHeader()) {
}
parsingHeader = false;
end = pos;
}
while循环解析每一行,请求头有个专门的类MimeHeaders存放,里面一般存放的是MessageBytes,简单的说一行就是MessageBytes,这个类有char,byte,String等表示,好处是很方便,任何三类都可以处理,对于涉及编码的来说
,char和byte可以非常方便的进行原生态的表示
接着是调用prepareRequest()方法,其实在parseHeaders()中进行了http headers的解析,这里只是拿出来用而已,只是把org.apache.coyote.Request的属性设置好而已
Http11Processor的process方法就到这里
回到Http11ConnectionHandler的process()方法,最后面有代码:
finally {
// if(proto.adapter != null) proto.adapter.recycle();
// processor.recycle();
if (processor instanceof ActionHook) {
((ActionHook) processor).action(ActionCode.ACTION_STOP, null);
}
recycledProcessors.offer(processor);
}
这里调用了Http11Processor的action的方法,我们看下这个方法:
if (actionCode == ActionCode.ACTION_COMMIT) {
// Commit current response 只有在response中触发这个分支
//一般来说现在response.isCommitted()是false的,但tomcat中很多地方都有这样的判断,主要是为了稳定性考虑(不存在多线程问题?)
if (response.isCommitted())
return;
// Validate and write response headers
prepareResponse();
try {
outputBuffer.commit();
} catch (IOException e) {
// Set error flag
error = true;
}
}
第一个分支就是,注意 prepareResponse(); 这里是准备org.apache.coyote.Response方法了,这个和request的处理差不多,是设置http协议响应头的一些内容
准备好的org.apache.coyote.Request和org.apache.coyote.Response在CoyoteAdapter里面会被用到,返回给tomcat的Connector组件,这个下回再说。
哇,真长! |
|
|