protected class Acceptor implements Runnable {
/**
* The background thread that listens for incoming TCP/IP connections and hands them off to an appropriate
* processor.
*/
public void run() {
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
// Accept the next incoming connection from the server socket
try {
// 这里等待用户的请求进入,线程会阻塞,直到等到用户的请求为止
Socket socket = serverSocketFactory.acceptSocket(serverSocket);
serverSocketFactory.initSocket(socket);
// Hand this socket off to an appropriate processor
if (!processSocket(socket)) {
// Close socket right away
try {
socket.close();
} catch (IOException e) {
// Ignore
}
}
} catch (IOException x) {
if (running) log.error(sm.getString("endpoint.accept.fail"), x);
} catch (Throwable t) {
log.error(sm.getString("endpoint.accept.fail"), t);
}
// The processor will recycle itself when it finishes
}
}
}
processSocket方法:
/**
* Process given socket.
*/
protected boolean processSocket(Socket socket) {
try {
if (executor == null) {
// 没有线程池队列,从WorkStack中获取一个,并分配给他做操作(WorkStack相当于线程池的作用)
getWorkerThread().assign(socket);
} else {
// 有线程池队列,直接new 一个SocketProcessor对这个socket连接做处理
executor.execute(new SocketProcessor(socket));
}
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
工人类:
protected class Worker implements Runnable {
protected Thread thread = null;
protected boolean available = false;
protected Socket socket = null;
/**
* Process an incoming TCP/IP connection on the specified socket. Any exception that occurs during processing
* must be logged and swallowed. <b>NOTE</b>: This method is called from our Connector's thread. We must assign
* it to our own thread so that multiple simultaneous requests can be handled.
*
* @param socket TCP socket to process
*/
synchronized void assign(Socket socket) {
// Wait for the Processor to get the previous Socket
while (available) {
// 当前Worker是否处于可用状态
try {
wait();
} catch (InterruptedException e) {
}
}
// Store the newly available Socket and notify our thread
this.socket = socket;
available = true;
notifyAll();
}
/**
* Await a newly assigned Socket from our Connector, or <code>null</code> if we are supposed to shut down.
*/
private synchronized Socket await() {
// 如果处于闲置状态,就等待新的任务分配
while (!available) {
try {
wait();
} catch (InterruptedException e) {
}
}
// Notify the Connector that we have received this Socket
Socket socket = this.socket;
available = false;
notifyAll();
return (socket);
}
/**
* The background thread that listens for incoming TCP/IP connections and hands them off to an appropriate
* processor.
*/
public void run() {
// Process requests until we receive a shutdown signal
while (running) {
// Wait for the next socket to be assigned
Socket socket = await();
if (socket == null) continue;
// 处理该次请求
if (!setSocketOptions(socket) || !handler.process(socket)) {
// Close socket
try {
socket.close();
} catch (IOException e) {
}
}
// Finish up this request
socket = null;
// 回收该线程池的使用
recycleWorkerThread(this);
}
}
以上代码的大致示意图: