package com.xiao.tomcatthreadpool;
import java.util.Hashtable;
public class ThreadPool {
public static final int MAX_THREADS = 10;
public static final int MAX_THREADS_MIN = 4;
public static final int MAX_SPARE_THREADS = 5;
public static final int MIN_SPARE_THREADS = 2;
public static final int WORK_WAIT_TIMEOUT = 10*1000;
private String name = "TP";
private boolean isDaemon;
private boolean stopThePool;
private int maxSpareThreads;
private int minSpareThreads;
private int currentThreadCount;
private int currentThreadsBusy;
private int maxThreads;
private int threadPriority = Thread.NORM_PRIORITY;
private int sequence = 0;
private ControlRunnable[] pool;
private MonitorRunnable monitor;
protected Hashtable threads=new Hashtable();
public ThreadPool() {
maxThreads = MAX_THREADS;
maxSpareThreads = MAX_SPARE_THREADS;
minSpareThreads = MIN_SPARE_THREADS;
currentThreadCount = 0;
currentThreadsBusy = 0;
stopThePool = false;
}
public static ThreadPool createThreadPool() {
return new ThreadPool();
}
public synchronized void start() {
stopThePool = false;
currentThreadCount = 0;
currentThreadsBusy = 0;
pool = new ControlRunnable[maxThreads];
openThreads(minSpareThreads);
if (maxSpareThreads < maxThreads) {
monitor = new MonitorRunnable(this);
}
}
public void run(Runnable r) {
ControlRunnable c = findControlRunnable();
c.runIt(r);
}
public void runIt(ThreadPoolRunnable r) {
if(null == r) {
throw new NullPointerException();
}
ControlRunnable c = findControlRunnable();
c.runIt(r);
}
public ControlRunnable findControlRunnable() {
ControlRunnable c;
if ( stopThePool ) {
throw new IllegalStateException();
}
synchronized(this) {
while (currentThreadsBusy == currentThreadCount) {
if (currentThreadCount < maxThreads) {
int toOpen = currentThreadCount + minSpareThreads;
openThreads(toOpen);
} else {
try {
this.wait();
}
catch(InterruptedException e) {
}
}
}
if(0 == currentThreadCount || stopThePool) {
throw new IllegalStateException();
}
int pos = currentThreadCount - currentThreadsBusy - 1;
c = pool[pos];
pool[pos] = null;
currentThreadsBusy++;
}
return c;
}
public void openThreads(int toOpen) {
if(toOpen > maxThreads) {
toOpen = maxThreads;
}
for(int i=currentThreadCount; i<toOpen; i++) {
pool[i - currentThreadsBusy] = new ControlRunnable(this);
}
currentThreadCount = toOpen;
}
public void addThread(ThreadWithAttributes t, ControlRunnable r) {
threads.put(t, r);
}
public void removeThread(Thread t) {
threads.remove(t);
}
public synchronized void notifyThreadEnd(ControlRunnable r) {
currentThreadCount --;
currentThreadsBusy --;
notify();
}
public synchronized void returnController(ControlRunnable r) {
if(0 == currentThreadCount || stopThePool) {
r.terminate();
return;
}
currentThreadsBusy--;
pool[currentThreadCount - currentThreadsBusy - 1] = r;
notify();
}
public synchronized void checkSpareControllers() {
if(stopThePool) {
return;
}
if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {
int toFree = currentThreadCount - currentThreadsBusy - maxSpareThreads;
for(int i=0; i<toFree; i++) {
ControlRunnable cr = pool[currentThreadCount-currentThreadsBusy -1];
cr.terminate();
pool[currentThreadCount-currentThreadsBusy -1] = null;
currentThreadCount --;
}
}
}
/**
* MonitorRunnable主要任务是监控线程数
* 如果线程数大于最大线程则回收线程
*/
public static class MonitorRunnable implements Runnable {
ThreadPool tp;
Thread t;
boolean shouldTerminate;
int interval = WORK_WAIT_TIMEOUT;
public MonitorRunnable(ThreadPool tp) {
this.tp = tp;
this.start();
}
public void setInterval(int i) {
interval = i;
}
public void start() {
shouldTerminate = false;
t = new Thread(this);
t.setDaemon(tp.getDaemon());
t.setName(tp.getName() + "-Monitor");
t.start();
}
public void stop() {
terminal();
}
public synchronized void terminal() {
this.shouldTerminate = true;
this.notify();
}
public void run() {
while(true) {
try {
synchronized(this) {
this.wait(interval);
}
if(shouldTerminate) {
break;
}
//System.out.println("currentThreadCount=" + currentThreadCount + " currentThreadsBusy=" + currentThreadsBusy + " ");
tp.checkSpareControllers();
} catch(InterruptedException e) {
}
}
}
}
public static class ControlRunnable implements Runnable {
private ThreadPool tp;
private boolean shouldTerminate;
private ThreadWithAttributes t;
private ThreadPoolRunnable toRun;
private Runnable toRunRunnable;
private boolean shouldRun;
public ControlRunnable(ThreadPool tp) {
toRun = null;
shouldTerminate = false;
shouldRun = false;
this.tp = tp;
t = new ThreadWithAttributes(tp, this);
t.setDaemon(true);
t.setName(tp.getName() + "-Processor" + tp.incSequence());
t.setPriority(tp.getThreadPriority());
tp.addThread(t, this);
t.start();
}
public void run() {
boolean _shouldRun = false;
boolean _shouldTerminate = false;
ThreadPoolRunnable _toRun = null;
try {
while(true) {
try {
synchronized(this) {
System.out.println("shouldRun=" + shouldRun);
while(!shouldRun && !shouldTerminate) {
this.wait();
}
_shouldRun = shouldRun;
_shouldTerminate = shouldTerminate;
_toRun = toRun;
}
if (_shouldTerminate)
break;
try {
if(_shouldRun) {
if (_toRun != null) {
_toRun.runIt(t.getThreadData(tp));
} else if (toRunRunnable != null) {
toRunRunnable.run();
} else {
}
}
} catch(Throwable r) {
_shouldTerminate = true;
_shouldRun = false;
tp.notifyThreadEnd(this);
} finally {
if(_shouldRun) {
shouldRun = false;
tp.returnController(this);
}
}
if (_shouldTerminate) {
break;
}
} catch(InterruptedException e) {
}
}
} finally {
tp.removeThread(Thread.currentThread());
}
}
public synchronized void runIt(Runnable toRun) {
this.toRunRunnable = toRun;
shouldRun = true;
this.notify();
}
public synchronized void runIt(ThreadPoolRunnable toRun) {
this.toRun = toRun;
shouldRun = true;
this.notify();
}
public void stop() {
this.terminate();
}
public void kill() {
t.stop();
}
public synchronized void terminate() {
shouldTerminate = true;
this.notify();
}
}
public String getName() {
return name;
}
public boolean getDaemon() {
return isDaemon;
}
public int getThreadPriority() {
return threadPriority;
}
public int incSequence() {
return sequence ++;
}
public void setThreadPriority(int threadPriority) {
this.threadPriority = threadPriority;
}
}
ThreadWithAttributes.java
package com.xiao.tomcatthreadpool;
import java.util.Hashtable;
public class ThreadWithAttributes extends Thread {
private Object control;
public static int MAX_NOTES=16;
private Object notes[]=new Object[MAX_NOTES];
private Hashtable attributes=new Hashtable();
private String currentStage;
private Object param;
private Object thData[];
public ThreadWithAttributes(Object control, Runnable r) {
super(r);
this.control=control;
}
public final Object[] getThreadData(Object control ) {
return thData;
}
public final void setThreadData(Object control, Object thData[] ) {
this.thData=thData;
}
public final void setNote( Object control, int id, Object value ) {
if( this.control != control ) return;
notes[id]=value;
}
public final String getCurrentStage(Object control) {
if( this.control != control ) return null;
return currentStage;
}
/** Information about the current request ( or the main object
* we are processing )
*/
public final Object getParam(Object control) {
if( this.control != control ) return null;
return param;
}
public final void setCurrentStage(Object control, String currentStage) {
if( this.control != control ) return;
this.currentStage = currentStage;
}
public final void setParam( Object control, Object param ) {
if( this.control != control ) return;
this.param=param;
}
public final Object getNote(Object control, int id ) {
if( this.control != control ) return null;
return notes[id];
}
public final Hashtable getAttributes(Object control) {
if( this.control != control ) return null;
return attributes;
}
}
ThreadPoolRunnable.java
package com.xiao.tomcatthreadpool;
public interface ThreadPoolRunnable {
public Object[] getInitData();
public void runIt(Object thData[]);
}