设为首页 收藏本站
查看: 574|回复: 0

[经验分享] tomcat的Http11Protocol,Http11Processor

[复制链接]

尚未签到

发表于 2017-1-26 08:29:03 | 显示全部楼层 |阅读模式
  上文说到在JIoEndpoint类中处理请求最终是调用到内部接口Handler的process()方法,而Handler的实现类是Http11Protocol的内部类Http11ConnectionHandler,Http11ConnectionHandler又委托Http11Processor进行处理,tomcat在这个地方我觉得类的设计不是很优雅啊,内部类用的太多,导致阅读的时候有困难(好处是内部类持有外部类的的引用,不用使用显示的new)。
  先把Http11ConnectionHandler的类附上:

    protected static class Http11ConnectionHandler implements Handler {
protected Http11Protocol proto;
protected AtomicLong registerCount = new AtomicLong(0);
protected RequestGroupInfo global = new RequestGroupInfo();
//无界线程安全队列,不会阻塞,自己加工的原因是processorCache的数量是有限的,为了线程和性能考虑
protected ConcurrentLinkedQueue<Http11Processor> recycledProcessors =
new ConcurrentLinkedQueue<Http11Processor>() {
protected AtomicInteger size = new AtomicInteger(0);
public boolean offer(Http11Processor processor) {
boolean offer = (proto.processorCache == -1) ? true : (size.get() < proto.processorCache);
//avoid over growing our cache or add after we have stopped
boolean result = false;
if ( offer ) {
result = super.offer(processor);
if ( result ) {
size.incrementAndGet();
}
}
if (!result) unregister(processor);
return result;
}
public Http11Processor poll() {
Http11Processor result = super.poll();
if ( result != null ) {
size.decrementAndGet();
}
return result;
}
public void clear() {
Http11Processor next = poll();
while ( next != null ) {
unregister(next);
next = poll();
}
super.clear();
size.set(0);
}
};
Http11ConnectionHandler(Http11Protocol proto) {
this.proto = proto;
}
public boolean process(Socket socket) {
//processorCache里面没有processor
Http11Processor processor = recycledProcessors.poll();
try {
if (processor == null) {
processor = createProcessor();
}
if (processor instanceof ActionHook) {
((ActionHook) processor).action(ActionCode.ACTION_START, null);
}
if (proto.isSSLEnabled() && (proto.sslImplementation != null)) {
processor.setSSLSupport
(proto.sslImplementation.getSSLSupport(socket));
} else {
processor.setSSLSupport(null);
}
processor.process(socket);
return false;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
Http11Protocol.log.debug
(sm.getString
("http11protocol.proto.socketexception.debug"), e);
} catch (java.io.IOException e) {
// IOExceptions are normal
Http11Protocol.log.debug
(sm.getString
("http11protocol.proto.ioexception.debug"), e);
}
// Future developers: if you discover any other
// rare-but-nonfatal exceptions, catch them here, and log as
// above.
catch (Throwable e) {
// any other exception or error is odd. Here we log it
// with "ERROR" level, so it will show up even on
// less-than-verbose logs.
Http11Protocol.log.error
(sm.getString("http11protocol.proto.error"), e);
} finally {
//       if(proto.adapter != null) proto.adapter.recycle();
//                processor.recycle();
if (processor instanceof ActionHook) {
((ActionHook) processor).action(ActionCode.ACTION_STOP, null);
}
recycledProcessors.offer(processor);
}
return false;
}
}
  看process()方法前,我们看另外一个有意思的东西,ConcurrentLinkedQueue的子类,自己实现的原因是ConcurrentLinkedQueue是无界的,非阻塞的,而这个需要的是有界的,查了下jdk的concurrent包,没有现有的有界的,非阻塞的实现类,所有这里自己实现个,还有实现这个主要还和jmx的注册联系在一起,这里由于processorCache==-1,长度限制这个原因不存在。主要看process()这个方法
  首先,从ConcurrentLinkedQueue中取得一个Http11Processor,没有的话就创建一个。
  由于Http11Processor是ActionHook的实现,ActionHook的action方法被执行,这个以后再说,这里是   ActionCode.ACTION_START的事件的执行
  接着是SSL的一些的设置,在https的时候有用
  接着重点,调用Http11Processor的process的方法
  后面就是收拾残局了,什么错误啊,Http11Processor的回收了,ACTION_STOP的事件执行了
  转到Http11Processor的process()的方法:

   public void process(Socket theSocket)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
// Set the remote address
remoteAddr = null;
remoteHost = null;
localAddr = null;
localName = null;
remotePort = -1;
localPort = -1;
// Setting up the I/O
this.socket = theSocket;
inputBuffer.setInputStream(socket.getInputStream());
outputBuffer.setOutputStream(socket.getOutputStream());
// Error flag
error = false;
keepAlive = true;
int keepAliveLeft = maxKeepAliveRequests;
int soTimeout = endpoint.getSoTimeout();
// When using an executor, these values may return non-positive values
int curThreads = endpoint.getCurrentThreadsBusy();
int maxThreads = endpoint.getMaxThreads();
if (curThreads > 0 && maxThreads > 0) {
// Only auto-disable keep-alive if the current thread usage % can be
// calculated correctly
if ((curThreads*100)/maxThreads > 75) {
keepAliveLeft = 1;
}
}
try {
socket.setSoTimeout(soTimeout);
} catch (Throwable t) {
log.debug(sm.getString("http11processor.socket.timeout"), t);
error = true;
}
boolean keptAlive = false;
//keepAlive:当处理完用户发起的 HTTP 请求后是否立即关闭 TCP 连接
while (started && !error && keepAlive) {
// Parsing the request header
try {
if (keptAlive) {
if (keepAliveTimeout > 0) {
socket.setSoTimeout(keepAliveTimeout);
}
else if (soTimeout > 0) {
socket.setSoTimeout(soTimeout);
}
}
//解析http header 的第一行数据
inputBuffer.parseRequestLine();
request.setStartTime(System.currentTimeMillis());
keptAlive = true;
if (disableUploadTimeout) {
socket.setSoTimeout(soTimeout);
} else {
socket.setSoTimeout(timeout);
}
//解析http header 的除第一行数据以外的数据
inputBuffer.parseHeaders();
} catch (IOException e) {
error = true;
break;
} catch (Throwable t) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.header.parse"), t);
}
// 400 - Bad Request
response.setStatus(400);
error = true;
}
if (!error) {
// Setting up filters, and parse some request headers
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {
prepareRequest();
} catch (Throwable t) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.request.prepare"), t);
}
// 400 - Internal Server Error
response.setStatus(400);
error = true;
}
}
if (maxKeepAliveRequests > 0 && --keepAliveLeft == 0)
keepAlive = false;
// Process the request in the adapter
if (!error) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
adapter.service(request, response);
// Handle when the response was committed before a serious
// error occurred.  Throwing a ServletException should both
// set the status to 500 and set the errorException.
// If we fail here, then the response is likely already
// committed, so we can't try and set headers.
if(keepAlive && !error) { // Avoid checking twice.
error = response.getErrorException() != null ||
statusDropsConnection(response.getStatus());
}
} catch (InterruptedIOException e) {
error = true;
} catch (Throwable t) {
log.error(sm.getString("http11processor.request.process"), t);
// 500 - Internal Server Error
response.setStatus(500);
error = true;
}
}
// Finish the handling of the request
try {
rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
// If we know we are closing the connection, don't drain input.
// This way uploading a 100GB file doesn't tie up the thread
// if the servlet has rejected it.
if(error)
inputBuffer.setSwallowInput(false);
inputBuffer.endRequest();
} catch (IOException e) {
error = true;
} catch (Throwable t) {
log.error(sm.getString("http11processor.request.finish"), t);
// 500 - Internal Server Error
response.setStatus(500);
error = true;
}
try {
rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
outputBuffer.endRequest();
} catch (IOException e) {
error = true;
} catch (Throwable t) {
log.error(sm.getString("http11processor.response.finish"), t);
error = true;
}
// If there was an error, make sure the request is counted as
// and error, and update the statistics counter
if (error) {
response.setStatus(500);
}
request.updateCounters();
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
// Don't reset the param - we'll see it as ended. Next request
// will reset it
// thrA.setParam(null);
// Next request
inputBuffer.nextRequest();
outputBuffer.nextRequest();
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
// Recycle
inputBuffer.recycle();
outputBuffer.recycle();
this.socket = null;
// Recycle ssl info
sslSupport = null;
}
  Http11Processor很夸张的又引用到JIoEndpoint(感觉很乱,是否有循环有用的感觉?)
  这里有一个很有意思的变量keepAliveLeft,一开始的时候等于maxKeepAliveRequests,也就是创建Processor的时候传入的Http11Protocol的maxKeepAliveRequests,默认为100,当JIoEndpoint的curThreads(当前已经用的线程数)/maxThreads>0.75,maxKeepAliveRequests马上可怜的变成1,这里可以认为是过载保护。
  当开始标志,没有错误标志等设置后,就开始真正解析这个传入的socket了.
  在这之前实例化InternalInputBuffer,InternalOutputBuffer两个类,对应socket的输入和输出,是可以重复使用的高性能的类。
  首先调用InternalInputBuffer的parseRequestLine()方法解析http的请求行,即'请求方法 请求uri http版本'那一行
  接着解析请求头InternalInputBuffer的parseHeaders()方法干这个活

    public void parseHeaders()
throws IOException {
while (parseHeader()) {
}
parsingHeader = false;
end = pos;
}
  while循环解析每一行,请求头有个专门的类MimeHeaders存放,里面一般存放的是MessageBytes,简单的说一行就是MessageBytes,这个类有char,byte,String等表示,好处是很方便,任何三类都可以处理,对于涉及编码的来说
  ,char和byte可以非常方便的进行原生态的表示
  接着是调用prepareRequest()方法,其实在parseHeaders()中进行了http headers的解析,这里只是拿出来用而已,只是把org.apache.coyote.Request的属性设置好而已
  Http11Processor的process方法就到这里
  回到Http11ConnectionHandler的process()方法,最后面有代码:

        finally {
//       if(proto.adapter != null) proto.adapter.recycle();
//                processor.recycle();
if (processor instanceof ActionHook) {
((ActionHook) processor).action(ActionCode.ACTION_STOP, null);
}
recycledProcessors.offer(processor);
}
  这里调用了Http11Processor的action的方法,我们看下这个方法:

   
if (actionCode == ActionCode.ACTION_COMMIT) {
// Commit current response  只有在response中触发这个分支

//一般来说现在response.isCommitted()是false的,但tomcat中很多地方都有这样的判断,主要是为了稳定性考虑(不存在多线程问题?)
if (response.isCommitted())
return;
// Validate and write response headers
prepareResponse();
try {
outputBuffer.commit();
} catch (IOException e) {
// Set error flag
error = true;
}
}
   第一个分支就是,注意  prepareResponse(); 这里是准备org.apache.coyote.Response方法了,这个和request的处理差不多,是设置http协议响应头的一些内容
  准备好的org.apache.coyote.Request和org.apache.coyote.Response在CoyoteAdapter里面会被用到,返回给tomcat的Connector组件,这个下回再说。
  哇,真长!

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-333489-1-1.html 上篇帖子: Tomcat web.xml 文件详解 下篇帖子: 异常的Tomcat输出信息
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表