Tomcat 中的有状态线程池
Tomcat中的connector负责将从客户端发出的请求封装成Request对象,再交由processor进行处理。为了提高性能,connector内部实现了一个简单的多例模式来获取processor,在启动阶段,会有一定数量的processor提前被产生并保留在内存中,当需要时直接从内存中取。如果当前所有processor都被占用,则会继续产生新的processor并丢进内存缓存。
/**
* The current number of processors that have been created.
*/
private int curProcessors = 0;
/**
* The minimum number of processors to start at initialization time.
*/
protected int minProcessors = 5;
/**
* The maximum number of processors allowed, or <0 for unlimited.
*/
private int maxProcessors = 20;
/**
* The set of processors that have been created but are not currently
* being used to process a request.
*/
private Stack<HttpProcessor> processors = new Stack<HttpProcessor>();
/**
* Get Processor from stack if there still exist, or create new one
* if current processor number didn't more than max number.
* Otherwise, null will be returned.
*
* @return instance of HttpProcessor
*/
protected HttpProcessor getProcessor() {
synchronized (processors) {
if (processors.size() > 0) {
returnprocessors.pop();
}
if ((maxProcessors > 0) && (curProcessors < maxProcessors)) {
return (newProcessor());
} else {
if (maxProcessors < 0) {
return (newProcessor());
} else {
return (null);
}
}
}
}
private HttpProcessor newProcessor() {
HttpProcessor processor = new HttpProcessor(this);
processor.start();
curProcessors++;
return processor;
}
public void recycle(HttpProcessor processor) {
processors.push(processor);
}
由于processor本身并不消耗资源,因此如此创建方式对性能并没有多大的提升。重点是对于每个processor实例,Tomcat将启动一个新的线程来处理socket,以此来或多更大的吞吐量。
以下是Tomcat如何启动多线程的processor并不断监听状态变化的方案。
HttpProcessor继承Runable接口,每当我们调用其start()方法时,都会启动一个新的线程:
public void start() {
Thread thread = new Thread(this);
thread.setDaemon(true);
thread.start();
}
public void run() {
while (!stopped) {
Socket socket = await();
if (socket == null) {
continue;
}
process(socket);
connector.recycle(this);
}
}
private synchronized Socket await() {
while (!available) {
try {
wait();
} catch (InterruptedException e) {
// Do nothing.
}
}
Socket socket = this.socket;
available = false;
notifyAll();
return socket;
}
当processor新线程启动时,它做了这样一系列的事情:
1. 等待其他线程通知,当某个通知到来时,验证available是否为false,如果为true,继续等待,否则进入第二步。
2. 获取实例变量socket的值,
3. 将available改成false。
4. 通知其他线程状态改变。
5. 对socket进行相应处理(process方法)。
6. connector资源回收该对象。
而在connector主线程中,是通过这种方式调用processor的:
HttpProcessor processor = getProcessor();
processor.assign(socket);
processor的assign方法如下:
/**
* The entry point of HttpProcessor. Assign an socket to the processor, it
* will start a new thread.
*
*
* @param socket
*/
public synchronized void assign(Socket socket) {
while(available) {
try {
wait();
} catch (InterruptedException e) {
// Do nothing.
}
}
this.socket = socket;
available = true;
notifyAll();
}
这个方法首先监听其他线程通知,一旦获得available为false了,便将socket值设置为新的值,并将available值改为true,最后通知其他线程。
assign方法和run方法运行在两个不同的线程中: connector主线程以及processor子线程。当主线程获得一个新的socket,就会assign给processor对象,并通知processor子线程。一旦通知完毕,主线程就返回继续监听其它socket。 而processor子线程收到通知后会对被assign的socket进行相应的处理。
通过这种方式,线程之间各司其职,获得了最大的性能提升。
页:
[1]