public void open() throws IOException
{
synchronized(this)
{
if (_acceptChannel == null)
{
// Create a new server socket
_acceptChannel = ServerSocketChannel.open();
// Bind the server socket to the local host and port
_acceptChannel.socket().setReuseAddress(getReuseAddress());
InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
_acceptChannel.socket().bind(addr,getAcceptQueueSize());
// Set to non blocking mode
_acceptChannel.configureBlocking(false);
}
}
}
super.doStart()方法如下:
protected void doStart() throws Exception
{
if (_server==null)
throw new IllegalStateException("No server");
// open listener port
open();//再一次调用open()方法,确保ServerSocketChannel启动,调用两次就能确保启动?
super.doStart();
if (_threadPool==null)
_threadPool=_server.getThreadPool();
if (_threadPool!=_server.getThreadPool() && (_threadPool instanceof LifeCycle))
((LifeCycle)_threadPool).start();
// Start selector thread
synchronized(this)
{
_acceptorThread=new Thread[getAcceptors()];
for (int i=0;i<_acceptorThread.length;i++)
{
if (!_threadPool.dispatch(new Acceptor(i)))//启动接受请求的线程
{
Log.warn("insufficient maxThreads configured for {}",this);
break;
}
}
}
Log.info("Started {}",this);
}
Acceptor线程的run()方法如下:
public void handle() throws IOException
{
// Loop while more in buffer
boolean more_in_buffer = true; // assume true until proven otherwise
int no_progress = 0;
while (more_in_buffer)
{
try
{
synchronized (this)
{
if (_handling)
throw new IllegalStateException(); // TODO delete this
// check
_handling = true;
}
setCurrentConnection(this);
long io = 0;
Continuation continuation = _request.getContinuation();//得到RetryContinuation
if (continuation != null && continuation.isPending())
{
Log.debug("resume continuation {}",continuation);
if (_request.getMethod() == null)
throw new IllegalStateException();
handleRequest();//处理http请求,执行filter,servlet等
}
else//解析http请求
{
// If we are not ended then parse available
if (!_parser.isComplete())
io = _parser.parseAvailable();
// Do we have more generating to do?
// Loop here because some writes may take multiple steps and
// we need to flush them all before potentially blocking in
// the
// next loop.
while (_generator.isCommitted() && !_generator.isComplete())
{
long written = _generator.flush();
io += written;
if (written <= 0)
break;
if (_endp.isBufferingOutput())
_endp.flush();
}
// Flush buffers
if (_endp.isBufferingOutput())
{
_endp.flush();
if (!_endp.isBufferingOutput())
no_progress = 0;
}
if (io > 0)
no_progress = 0;
else if (no_progress++ >= 2)
return;
}
}
catch (HttpException e)
{
if (Log.isDebugEnabled())
{
Log.debug("uri=" + _uri);
Log.debug("fields=" + _requestFields);
Log.debug(e);
}
_generator.sendError(e.getStatus(),e.getReason(),null,true);
_parser.reset(true);
_endp.close();
throw e;
}
finally
{
setCurrentConnection(null);
more_in_buffer = _parser.isMoreInBuffer() || _endp.isBufferingInput();
synchronized (this)
{
_handling = false;
if (_destroy)
{
destroy();
return;
}
}
if (_parser.isComplete() && _generator.isComplete() && !_endp.isBufferingOutput())
{
if (!_generator.isPersistent())
{
_parser.reset(true);
more_in_buffer = false;
}
if (more_in_buffer)
{
reset(false);
more_in_buffer = _parser.isMoreInBuffer() || _endp.isBufferingInput();
}
else
reset(true);
no_progress = 0;
}
Continuation continuation = _request.getContinuation();
if (continuation != null && continuation.isPending())
{
break;
}else if (_generator.isCommitted() && !_generator.isComplete() && _endp instanceof SelectChannelEndPoint) // TODO
((SelectChannelEndPoint)_endp).setWritable(false);
}
}
}