设为首页 收藏本站
查看: 870|回复: 0

[经验分享] jetty continuation基本原理及实现

[复制链接]

尚未签到

发表于 2017-2-26 13:20:35 | 显示全部楼层 |阅读模式
背景
在io密集型的web 应用,如何能更好地提升后台性能,jetty continuation是一个选择
现在特别流行的说法就是事件驱动,看看node.js以及redis,
jetty continuation也不例外


一个例子

package org.kaka.web;import java.io.IOException;import java.io.PrintWriter;import javax.servlet.ServletException;import javax.servlet.http.HttpServlet;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;import org.eclipse.jetty.continuation.Continuation;import org.eclipse.jetty.continuation.ContinuationSupport;public class SimpleSuspendResumeServlet extends HttpServlet {/*** */private static final long serialVersionUID = 6112996063962978130L;private MyAsyncHandler myAsyncHandler;public void init() throws ServletException {myAsyncHandler = new MyAsyncHandler() {public void register(final MyHandler myHandler) {new Thread(new Runnable() {public void run() {try {Thread.sleep(10000);myHandler.onMyEvent("complete!");} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}).start();}};}public void doGet(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException {// if we need to get asynchronous results//Object results = request.getAttribute("results");final PrintWriter writer = response.getWriter();final Continuation continuation = ContinuationSupport.getContinuation(request);//if (results == null) {if (continuation.isInitial()) {//request.setAttribute("results","null");sendMyFirstResponse(response);// suspend the requestcontinuation.suspend(); // always suspend before registration// register with async service. The code here will depend on the// the service used (see Jetty HttpClient for example)myAsyncHandler.register(new MyHandler() {public void onMyEvent(Object result) {continuation.setAttribute("results", result);continuation.resume();}});return; // or continuation.undispatch();}if (continuation.isExpired()) {sendMyTimeoutResponse(response);return;}//Send the resultsObject results = request.getAttribute("results");if(results==null){response.getWriter().write("why reach here??");continuation.resume();return;}sendMyResultResponse(response, results);}private interface MyAsyncHandler {public void register(MyHandler myHandler);}private interface MyHandler {public void onMyEvent(Object result);}private void sendMyFirstResponse(HttpServletResponse response) throws IOException {//必须加上这一行,否者flush也没用,为什么?response.setContentType("text/html");response.getWriter().write("start");response.getWriter().flush();}private void sendMyResultResponse(HttpServletResponse response,Object results) throws IOException {//response.setContentType("text/html");response.getWriter().write("results:" + results);response.getWriter().flush();}private void sendMyTimeoutResponse(HttpServletResponse response)throws IOException {response.getWriter().write("timeout");}}

这个例子到底干了什么?

出于演示的目的,这个例子格外简单。模拟了在web请求中如何处理一个耗时阻塞10s的io操作



  • 常规的程序遇到这种情况(假设没有设置超时),通常会等待10s,线程会白白浪费在哪,一旦请求一多,线程池及等待队列会满掉,从而导致网站无法服务
  • 稍微好点的程序恐怕会使用futuretask来解决问题,但无论怎样,futuretask带有轮询的性质,或多或少会带有阻塞
  • jetty continuation更进了一步,采用事件驱动的方式来通知请求完成,丝毫不浪费一点io时间,一旦遇到阻塞,当前worker线程会结束,这样就可以服务其他请求,等耗时操作处理完毕通知jetty,此时jetty会再动用一个新的worker线程再次处理请求
常见流程


  • 具体流程1,耗时操作在AsyncContinuation默认超时(30s,可以设置)之内完成

  • 当请求到来,被selector线程感知


  • selector线程,会实例化org.eclipse.jetty.server.Request以及org.eclipse.jetty.server.AsyncContinuation
  • AsyncContinuation实例此时状态为__IDLE
worker线程A会获取到selector线程派发的这个请求


  • 流程进入org.eclipse.jetty.server.AsyncHttpConnection(AbstractHttpConnection).handleRequest()
  • 调用_request._async.handling(),AsyncContinuation实例的状态__IDLE-->__DISPATCHED
  • 调用真正的业务逻辑server.handle(this);业务逻辑中调用AsyncContinuation.suspend(ServletContext, ServletRequest, ServletResponse) ,会将AsyncContinuation实例的状态__DISPATCHED-->__ASYNCSTARTED
  • 最后调用_request._async.unhandle(),AsyncContinuation实例的状态__ASYNCSTARTED-->__ASYNCWAIT

    • 会将suspend的那个请求任务放入selector线程的超时队列(和redis 事件框架的做法非常类似)

  • 为了简单期间,假设在上诉过程中,耗时操作还没完成,此时worker线程A被回收将会服务其他请求,虽然当前请求并未完成
匿名应用线程(在jettywoker线程另外启动的线程)完成耗时操作


  • 调用AsyncContinuation.resume() ,此时AsyncContinuation实例的状态__ASYNCWAIT-->__REDISPATCH
  • 然后会往worker线程队列或者selector线程队列中添加一个请求task,以此来触发jetty再次完成未完成的请求
worker线程B再次处理那个请求


  • 流程进入org.eclipse.jetty.server.AsyncHttpConnection(AbstractHttpConnection).handleRequest()
  • 调用_request._async.handling(),AsyncContinuation实例的状态__REDISPATCH-->__REDISPATCHED
  • 调用server.handleAsync(this);再次进入业务逻辑,此时耗时操作已完成,可以输出最后的结果
  • 调用_request._async.unhandle() AsyncContinuation实例的状态__REDISPATCHED-->__UNCOMPLETED
  • 调用_request._async.doComplete(),AsyncContinuation实例的状态__UNCOMPLETED-->__COMPLETED
  • 请求结束


  • 具体流程2,耗时操作超过了AsyncContinuation默认超时时间

  • 同上
  • 同上
  • selector线程轮询中感知到耗时任务超时


  • 此时在轮询中能感知并获取这个task,见org.eclipse.jetty.io.nio.SelectorManager$SelectSet.doSelect()
  • 将此task丢入worker线程队列
worker线程C处理超时任务


  • 调用AsyncContinuation.expired(),此时AsyncContinuation实例的状态__ASYNCWAIT-->__REDISPATCH
  • 然后会往worker线程队列或者selector线程队列中添加一个请求task,以此来触发jetty再次完成未完成的请求
同上面的4


  • 具体流程3,耗时操作非常快

  • 同上
  • worker线程A会获取到selector线程派发的这个请求


  • 流程进入org.eclipse.jetty.server.AsyncHttpConnection(AbstractHttpConnection).handleRequest()
  • 调用_request._async.handling(),AsyncContinuation实例的状态__IDLE-->__DISPATCHED
  • 调用真正的业务逻辑server.handle(this);业务逻辑中调用AsyncContinuation.suspend(ServletContext, ServletRequest, ServletResponse) ,会将AsyncContinuation实例的状态__DISPATCHED-->__ASYNCSTARTED​
匿名应用线程(在jettywoker线程另外启动的线程)完成耗时操作,在2)中调用_request._async.unhandle()之前完成


  • 调用AsyncContinuation.resume() ,此时AsyncContinuation实例的状态__ASYNCSTARTED-->__REDISPATCHING
还是那个work线程A,注意此时3已经完成


  • 调用_request._async.unhandle(),,AsyncContinuation实例的状态__REDISPATCHING-->__REDISPATCHED
  • 由于此时_request._async.unhandle()返回false,再次进入循环调用server.handleAsync(this);再次进入业务逻辑,此时耗时操作已完成,可以输出最后的结果
  • 调用_request._async.unhandle() AsyncContinuation实例的状态__REDISPATCHED-->__UNCOMPLETED
  • 调用_request._async.doComplete(),AsyncContinuation实例的状态__UNCOMPLETED-->__COMPLETED
  • 请求结束


小结


总体来讲

  • jetty continuation需要在应用中使用应用级的线程池来完成一些io任务,这个在普通的web编程并不常见
  • 在应用的第一次请求中需要

    • 调用AsyncContinuation.suspend(完成一个状态的转换,以及产生一个超时任务)
    • 将耗时任务派发给应用线程池
    • 完毕后,jetty回收该请求线程

  • io任务在应用线程中完成后,然后通过AsyncContinuation.resume或者complete等方法通知jetty任务完成
  • jetty然后会再次分配一个worker线程处理该请求(如果逻辑复杂,如并行的多次Io会分配多个worker线程)
  • 引入jetty continuation带来的负面作用是编程模型会变得复杂,需要仔细的切割各类io任务


此例子在jetty7.2和jetty7.6中测试通过,但在jetty8中运行失败(因为调用flush导致request中_header为空从而引发NPE[jetty8的bug?],但如果不掉用有达不大展示效果)

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-347500-1-1.html 上篇帖子: Jetty6 指南和解析 下篇帖子: 用Jetty快速开发J2EE应用
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表