设为首页 收藏本站
查看: 974|回复: 0

[经验分享] Jetty基于NIO的方式处理请求

[复制链接]

尚未签到

发表于 2017-2-27 08:45:20 | 显示全部楼层 |阅读模式
  Jetty基于NIO的方式处理请求的类是SelectChannelConnector,该类同样继承AbstractLifeCycle类,SelectChannelConnector初始化的时候会调用AbstractLifeCycle类的start()方法,如下:

public final void start() throws Exception
{
synchronized (_lock)
{
try
{
if (_state == STARTED || _state == STARTING)
return;
setStarting();
doStart();
Log.debug("started {}",this);
setStarted();
}
catch (Exception e)
{
setFailed(e);
throw e;
}
catch (Error e)
{
setFailed(e);
throw e;
}
}
}
  doStart()方法在SelectChannelConnector类中.如下:

   protected void doStart() throws Exception
{
_manager.setSelectSets(getAcceptors());//设置接收请求的线程个数,默认1个
_manager.setMaxIdleTime(getMaxIdleTime());
_manager.setLowResourcesConnections(getLowResourcesConnections());
_manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
_manager.start();//初始化Selector
open();//初始化ServerSocketChannel
_manager.register(_acceptChannel);
super.doStart();
}
  _manager类名为SelectorManager,open()方法如下:

    public void open() throws IOException
{
synchronized(this)
{
if (_acceptChannel == null)
{
// Create a new server socket
_acceptChannel = ServerSocketChannel.open();
// Bind the server socket to the local host and port
_acceptChannel.socket().setReuseAddress(getReuseAddress());
InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
_acceptChannel.socket().bind(addr,getAcceptQueueSize());
// Set to non blocking mode
_acceptChannel.configureBlocking(false);
}
}
}
  super.doStart()方法如下:

protected void doStart() throws Exception
{
if (_server==null)
throw new IllegalStateException("No server");
// open listener port
open();//再一次调用open()方法,确保ServerSocketChannel启动,调用两次就能确保启动?
super.doStart();
if (_threadPool==null)
_threadPool=_server.getThreadPool();
if (_threadPool!=_server.getThreadPool() && (_threadPool instanceof LifeCycle))
((LifeCycle)_threadPool).start();
// Start selector thread
synchronized(this)
{
_acceptorThread=new Thread[getAcceptors()];
for (int i=0;i<_acceptorThread.length;i++)
{
if (!_threadPool.dispatch(new Acceptor(i)))//启动接受请求的线程
{
Log.warn("insufficient maxThreads configured for {}",this);
break;
}
}
}
Log.info("Started {}",this);
}
  Acceptor线程的run()方法如下:

public void run()
{   
Thread current = Thread.currentThread();
String name;
synchronized(AbstractConnector.this)//设置当前线程的名字,是不是太复杂点
{
if (_acceptorThread==null)
return;
_acceptorThread[_acceptor]=current;
name =_acceptorThread[_acceptor].getName();
current.setName(name+" - Acceptor"+_acceptor+" "+AbstractConnector.this);
}
int old_priority=current.getPriority();
try
{
current.setPriority(old_priority-_acceptorPriorityOffset);
while (isRunning() && getConnection()!=null)//connector初始化并且ServerSocketChannel存在
{
try
{
accept(_acceptor);//处理收到的请求
}
catch(EofException e)
{
Log.ignore(e);
}
catch(IOException e)
{
Log.ignore(e);
}
catch(ThreadDeath e)
{
throw e;
}
catch(Throwable e)
{
Log.warn(e);
}
}
}
finally
{   
current.setPriority(old_priority);
current.setName(name);
synchronized(AbstractConnector.this)
{
if (_acceptorThread!=null)
_acceptorThread[_acceptor]=null;
}
}
}
  accept(_acceptor)最终会调用SelectorManager.SelectSet.doSelect()方法,该方法比较复杂,简单来说就是每接受一个请求就注册到Selector上,并且用SelectChannelEndPoint类(本身也是一个线程)处理请求,SelectChannelEndPoint类的run()方法如下:

   public void run()
{
try
{
_connection.handle();
}
catch (ClosedChannelException e)
{
Log.ignore(e);
}
catch (EofException e)
{
Log.debug("EOF", e);
try{close();}
catch(IOException e2){Log.ignore(e2);}
}
catch (HttpException e)
{
Log.debug("BAD", e);
try{close();}
catch(IOException e2){Log.ignore(e2);}
}
catch (Throwable e)
{
Log.warn("handle failed", e);
try{close();}
catch(IOException e2){Log.ignore(e2);}
}
finally
{
undispatch();
}
}
  _connection类的类名为HttpConnection,HttpConnection的handle()方法如下:

   public void handle() throws IOException
{
// Loop while more in buffer
boolean more_in_buffer = true; // assume true until proven otherwise
int no_progress = 0;
while (more_in_buffer)
{
try
{
synchronized (this)
{
if (_handling)
throw new IllegalStateException(); // TODO delete this
// check
_handling = true;
}
setCurrentConnection(this);
long io = 0;
Continuation continuation = _request.getContinuation();//得到RetryContinuation
if (continuation != null && continuation.isPending())
{
Log.debug("resume continuation {}",continuation);
if (_request.getMethod() == null)
throw new IllegalStateException();
handleRequest();//处理http请求,执行filter,servlet等
}
else//解析http请求
{
// If we are not ended then parse available
if (!_parser.isComplete())
io = _parser.parseAvailable();
// Do we have more generating to do?
// Loop here because some writes may take multiple steps and
// we need to flush them all before potentially blocking in
// the
// next loop.
while (_generator.isCommitted() && !_generator.isComplete())
{
long written = _generator.flush();
io += written;
if (written <= 0)
break;
if (_endp.isBufferingOutput())
_endp.flush();
}
// Flush buffers
if (_endp.isBufferingOutput())
{
_endp.flush();
if (!_endp.isBufferingOutput())
no_progress = 0;
}
if (io > 0)
no_progress = 0;
else if (no_progress++ >= 2)
return;
}
}
catch (HttpException e)
{
if (Log.isDebugEnabled())
{
Log.debug("uri=" + _uri);
Log.debug("fields=" + _requestFields);
Log.debug(e);
}
_generator.sendError(e.getStatus(),e.getReason(),null,true);
_parser.reset(true);
_endp.close();
throw e;
}
finally
{
setCurrentConnection(null);
more_in_buffer = _parser.isMoreInBuffer() || _endp.isBufferingInput();
synchronized (this)
{
_handling = false;
if (_destroy)
{
destroy();
return;
}
}
if (_parser.isComplete() && _generator.isComplete() && !_endp.isBufferingOutput())
{
if (!_generator.isPersistent())
{
_parser.reset(true);
more_in_buffer = false;
}
if (more_in_buffer)
{
reset(false);
more_in_buffer = _parser.isMoreInBuffer() || _endp.isBufferingInput();
}
else
reset(true);
no_progress = 0;
}
Continuation continuation = _request.getContinuation();
if (continuation != null && continuation.isPending())
{
break;
}else if (_generator.isCommitted() && !_generator.isComplete() && _endp instanceof SelectChannelEndPoint) // TODO
((SelectChannelEndPoint)_endp).setWritable(false);
}
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-347665-1-1.html 上篇帖子: 架构师之jetty使用----------------问题集锦 下篇帖子: Jetty Continuation实现原理和使用场景分析
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表