|
在Tomca中Endpoint主要用来接收网络请求,处理则由ConnectionHandler来执行.ConnectionHandler主要作用是调用对应协议的Processor来处理请求.而对应的processor经过简单的内容解析之后.则调用Adapter(连接适配器)的方法,将请求转发给容器.由Servlet容器来处理用户请求.由于Tomcat支持不同的协议,则需要不同的ConnectionHandler和Processor进行处理.AbstractConnectionHandler和AbstractProcessor为它们提供了基础的实现.
AbstractConnectionHandler及其子类都是AbstractProtocol及其子类的内部类.不同的Protocol实现了不同的ConnectionHandler.对请求的操作process方法是在AbstractConnectionHandler实现的,但是具体的processor的创建等是在子类中实现的.
public SocketState process(SocketWrapper<S> socket,SocketStatus status)
{
Processor<S> processor = connections.remove(socket.getSocket());
if (status == SocketStatus.DISCONNECT && processor == null) {
//nothing more to be done endpoint requested a close
//and there are no object associated with this connection
return SocketState.CLOSED;
}
socket.setAsync(false);
try
{
if (processor == null) {
processor = recycledProcessors.poll();
}
if (processor == null) {
processor = createProcessor();
}
initSsl(socket, processor);
SocketState state = SocketState.CLOSED;
do {
if (status == SocketStatus.DISCONNECT) {
//do nothing here, just wait for it to get recycled
} else if (processor.isAsync() || state == SocketState.ASYNC_END) {
state = processor.asyncDispatch(status);
} else if (processor.isComet()) {
state = processor.event(status);
} else if (processor.isUpgrade()) {
state = processor.upgradeDispatch();
} else {
state = processor.process(socket);
}
if (state != SocketState.CLOSED && processor.isAsync()) {
state = processor.asyncPostProcess();
}
if (state == SocketState.UPGRADING) {
// Get the UpgradeInbound handler
UpgradeInbound inbound = processor.getUpgradeInbound();
// Release the Http11 processor to be re-used
release(socket, processor, false, false);
// Create the light-weight upgrade processor
processor = createUpgradeProcessor(socket, inbound);
inbound.onUpgradeComplete();
}
} while (state == SocketState.ASYNC_END ||
state == SocketState.UPGRADING);
if (state == SocketState.LONG) {
// In the middle of processing a request/response. Keep the
// socket associated with the processor. Exact requirements
// depend on type of long poll
longPoll(socket, processor);
} else if (state == SocketState.OPEN) {
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
release(socket, processor, false, true);
} else if (state == SocketState.SENDFILE) {
// Sendfile in progress. If it fails, the socket will be
// closed. If it works, the socket will be re-added to the
// poller
release(socket, processor, false, false);
} else if (state == SocketState.UPGRADED) {
// Need to keep the connection associated with the processor
longPoll(socket, processor);
} else {
// Connection closed. OK to recycle the processor.
if (!(processor instanceof UpgradeProcessor)) {
release(socket, processor, true, false);
}
}
return state;
}
catch clause
{
......
}
// Don't try to add upgrade processors back into the pool
if (!(processor instanceof UpgradeProcessor)) {
release(socket, processor, true, false);
}
return SocketState.CLOSED;
}
protected abstract P createProcessor();
protected abstract void initSsl(SocketWrapper<S> socket,Processor<S> processor);
protected abstract void longPoll(SocketWrapper<S> socket,Processor<S> processor);
protected abstract void release(SocketWrapper<S> socket,Processor<S> processor, boolean socketClosing,boolean addToPoller);
protected abstract Processor<S> createUpgradeProcessor(SocketWrapper<S> socket,UpgradeInbound inbound) throws IOException;
在AbstractConnectionHandler的process方法中,先试图从connections中获取当前Socket对应的Processor如果没有找到的话从recycledProcessors中获取,也就是已经处理过连接但是没有被销毁的Processor这样做的好处是避免频繁地创建和销毁对象.processor还是为空的话,那就使用createProcessor创建.上面的的代码片段中也列举了一些在子类中实现的重要方法.它的子类包含了Http11Protocol的Http11ConnectionHandler和Http11AprProtocol的Http11ConnectionHandler以及Http11NioProtocol的Http11ConnectionHandler等这些Protocol用有不同的实现,也就是其内部的Endpoint和ConnectionHandler的实现方式不同.以Http11Protocol为例,如下代码.
protected static class Http11ConnectionHandler
extends AbstractConnectionHandler<Socket, Http11Processor> implements Handler {
protected Http11Protocol proto;
Http11ConnectionHandler(Http11Protocol proto) {
this.proto = proto;
}
@Override
protected AbstractProtocol getProtocol() {
return proto;
}
@Override
protected Log getLog() {
return log;
}
@Override
public SSLImplementation getSslImplementation() {
return proto.sslImplementation;
}
/**
* Expected to be used by the handler once the processor is no longer
* required.
*
* @param socket Not used in BIO
* @param processor
* @param isSocketClosing Not used in HTTP
* @param addToPoller Not used in BIO
*/
@Override
public void release(SocketWrapper<Socket> socket,
Processor<Socket> processor, boolean isSocketClosing,
boolean addToPoller) {
processor.recycle(isSocketClosing);
recycledProcessors.offer(processor);
}
@Override
protected void initSsl(SocketWrapper<Socket> socket,
Processor<Socket> processor) {
if (proto.isSSLEnabled() && (proto.sslImplementation != null)) {
processor.setSslSupport(
proto.sslImplementation.getSSLSupport(
socket.getSocket()));
} else {
processor.setSslSupport(null);
}
}
@Override
protected void longPoll(SocketWrapper<Socket> socket,
Processor<Socket> processor) {
connections.put(socket.getSocket(), processor);
}
@Override
protected Http11Processor createProcessor() {
Http11Processor processor = new Http11Processor(
proto.getMaxHttpHeaderSize(), (JIoEndpoint)proto.endpoint,
proto.getMaxTrailerSize());
processor.setAdapter(proto.adapter);
processor.setMaxKeepAliveRequests(proto.getMaxKeepAliveRequests());
processor.setKeepAliveTimeout(proto.getKeepAliveTimeout());
processor.setConnectionUploadTimeout(
proto.getConnectionUploadTimeout());
processor.setDisableUploadTimeout(proto.getDisableUploadTimeout());
processor.setCompressionMinSize(proto.getCompressionMinSize());
processor.setCompression(proto.getCompression());
processor.setNoCompressionUserAgents(proto.getNoCompressionUserAgents());
processor.setCompressableMimeTypes(proto.getCompressableMimeTypes());
processor.setRestrictedUserAgents(proto.getRestrictedUserAgents());
processor.setSocketBuffer(proto.getSocketBuffer());
processor.setMaxSavePostSize(proto.getMaxSavePostSize());
processor.setServer(proto.getServer());
processor.setDisableKeepAlivePercentage(
proto.getDisableKeepAlivePercentage());
register(processor);
return processor;
}
@Override
protected Processor<Socket> createUpgradeProcessor(
SocketWrapper<Socket> socket, UpgradeInbound inbound)
throws IOException {
return new UpgradeBioProcessor(socket, inbound);
}
}
首先它实现了父类的方法createProcessor创建了一个与Http11Protocol实现方式对应的Processor即Http11Processor.实现的其他方法包括了将Socket于Processor对应放入map的longPoll.暂时释放Processor的release.及initSsl等.但是最主要处理方法process还是在父类中实现.而所有的处理还是在Processor中的process实现.
首发于泛泛之辈 - http://www.lihongkun.com/archives/127 |
|