设为首页 收藏本站
查看: 650|回复: 0

[经验分享] tomcat线程池的实现

[复制链接]
累计签到:2 天
连续签到:1 天
发表于 2017-1-26 13:31:31 | 显示全部楼层 |阅读模式
  Tomcat的线程池主要使用了ThreadPool.java、ThreadPoolRunnable.java、ThreadWithAttributes.java,其中ThreadPoolRunnable.java是一个接口,所有的需要使用线程池的服务都可以实现这个接口。而实现的核心则再ThreadPool.java中的两个内部类:MonitorRunnable.java和ControlRunnable.java。
  MonitorRunnable.java在线程池启动之后定期(60s)的扫描线程数,如果空闲的线程大于最大空闲线程数,则结束多余的线程。
  ControlRunnable.java是所有启动的线程,若由任务需要执行,ThreadPool会先找一个空闲的ControlRunnable来执行,若没有空闲的,则创建,若现有的busy线程已经达到最大值,则等待。任务执行结束后通知ControlRunnable继续wait,直到有任务执行或被MonitorRunnable回收。
  若要使用线程池可以实现Runnable接口,或者可以实现ThreadPoolRunnable 接口,当然自己还可以扩展这个类,以便实现更多的使用线程池的方式。
  ThreadPool.java
  


package com.xiao.tomcatthreadpool;
import java.util.Hashtable;
public class ThreadPool {
public static final int MAX_THREADS = 10;
public static final int MAX_THREADS_MIN = 4;
public static final int MAX_SPARE_THREADS = 5;
public static final int MIN_SPARE_THREADS = 2;
public static final int WORK_WAIT_TIMEOUT = 10*1000;
private String name = "TP";
private boolean isDaemon;
private boolean stopThePool;
private int maxSpareThreads;
private int minSpareThreads;
private int currentThreadCount;
private int currentThreadsBusy;
private int maxThreads;
private int threadPriority = Thread.NORM_PRIORITY;
private int sequence = 0;
private ControlRunnable[] pool;
private MonitorRunnable monitor;
protected Hashtable threads=new Hashtable();
public ThreadPool() {
maxThreads = MAX_THREADS;
maxSpareThreads = MAX_SPARE_THREADS;
minSpareThreads = MIN_SPARE_THREADS;
currentThreadCount = 0;
currentThreadsBusy = 0;
stopThePool = false;
}
public static ThreadPool createThreadPool() {
return new ThreadPool();
}
public synchronized void start() {
stopThePool = false;
currentThreadCount = 0;
currentThreadsBusy = 0;
pool = new ControlRunnable[maxThreads];
openThreads(minSpareThreads);
if (maxSpareThreads < maxThreads) {
monitor = new MonitorRunnable(this);
}
}
public void run(Runnable r) {
ControlRunnable c = findControlRunnable();
c.runIt(r);
}
public void runIt(ThreadPoolRunnable r) {
if(null == r) {
throw new NullPointerException();
}
ControlRunnable c = findControlRunnable();
c.runIt(r);
}
public ControlRunnable findControlRunnable() {
ControlRunnable c;
if ( stopThePool ) {
throw new IllegalStateException();
}
synchronized(this) {
while (currentThreadsBusy == currentThreadCount) {
if (currentThreadCount < maxThreads) {
int toOpen = currentThreadCount + minSpareThreads;
openThreads(toOpen);
} else {
try {
this.wait();
}
catch(InterruptedException e) {
}
}
}
if(0 == currentThreadCount || stopThePool) {
throw new IllegalStateException();
}
int pos = currentThreadCount - currentThreadsBusy - 1;
c = pool[pos];
pool[pos] = null;
currentThreadsBusy++;
}
return c;
}
public void openThreads(int toOpen) {
if(toOpen > maxThreads) {
toOpen = maxThreads;
}
for(int i=currentThreadCount; i<toOpen; i++) {
pool[i - currentThreadsBusy] = new ControlRunnable(this);
}
currentThreadCount = toOpen;
}
public void addThread(ThreadWithAttributes t, ControlRunnable r) {
threads.put(t, r);
}
public void removeThread(Thread t) {
threads.remove(t);
}
public synchronized void notifyThreadEnd(ControlRunnable r) {
currentThreadCount --;
currentThreadsBusy --;
notify();
}
public synchronized void returnController(ControlRunnable r) {
if(0 == currentThreadCount || stopThePool) {
r.terminate();
return;
}
currentThreadsBusy--;
pool[currentThreadCount - currentThreadsBusy - 1] = r;
notify();
}
public synchronized void checkSpareControllers() {
if(stopThePool) {
return;
}
if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {
int toFree = currentThreadCount - currentThreadsBusy - maxSpareThreads;
for(int i=0; i<toFree; i++) {
ControlRunnable cr = pool[currentThreadCount-currentThreadsBusy -1];
cr.terminate();
pool[currentThreadCount-currentThreadsBusy -1] = null;
currentThreadCount --;
}
}
}
/**
* MonitorRunnable主要任务是监控线程数
* 如果线程数大于最大线程则回收线程
*/
public static class MonitorRunnable implements Runnable {
ThreadPool tp;
Thread t;
boolean shouldTerminate;
int interval = WORK_WAIT_TIMEOUT;
public MonitorRunnable(ThreadPool tp) {
this.tp = tp;
this.start();
}
public void setInterval(int i) {
interval = i;
}
public void start() {
shouldTerminate = false;
t = new Thread(this);
t.setDaemon(tp.getDaemon());
t.setName(tp.getName() + "-Monitor");
t.start();
}
public void stop() {
terminal();
}
public synchronized void terminal() {
this.shouldTerminate = true;
this.notify();
}
public void run() {
while(true) {
try {
synchronized(this) {
this.wait(interval);
}
if(shouldTerminate) {
break;
}
//System.out.println("currentThreadCount=" + currentThreadCount + " currentThreadsBusy=" + currentThreadsBusy + " ");
tp.checkSpareControllers();
} catch(InterruptedException e) {
}
}
}
}
public static class ControlRunnable implements Runnable {
private ThreadPool tp;
private boolean shouldTerminate;
private ThreadWithAttributes     t;
private ThreadPoolRunnable   toRun;
private Runnable toRunRunnable;
private boolean    shouldRun;
public ControlRunnable(ThreadPool tp) {
toRun = null;
shouldTerminate = false;
shouldRun = false;
this.tp = tp;
t = new ThreadWithAttributes(tp, this);
t.setDaemon(true);
t.setName(tp.getName() + "-Processor" + tp.incSequence());
t.setPriority(tp.getThreadPriority());
tp.addThread(t, this);
t.start();
}
public void run() {
boolean _shouldRun = false;
boolean _shouldTerminate = false;
ThreadPoolRunnable _toRun = null;
try {
while(true) {
try {
synchronized(this) {
System.out.println("shouldRun=" + shouldRun);
while(!shouldRun && !shouldTerminate) {
this.wait();
}
_shouldRun = shouldRun;
_shouldTerminate = shouldTerminate;
_toRun = toRun;
}
if (_shouldTerminate)
break;
try {
if(_shouldRun) {
if (_toRun != null) {
_toRun.runIt(t.getThreadData(tp));
} else if (toRunRunnable != null) {
toRunRunnable.run();
} else {
}
}
} catch(Throwable r) {
_shouldTerminate = true;
_shouldRun = false;
tp.notifyThreadEnd(this);
} finally {
if(_shouldRun) {
shouldRun = false;
tp.returnController(this);
}
}
if (_shouldTerminate) {
break;
}
} catch(InterruptedException e) {
}
}
} finally {
tp.removeThread(Thread.currentThread());
}
}
public synchronized void runIt(Runnable toRun) {
this.toRunRunnable = toRun;
shouldRun = true;
this.notify();
}
public synchronized void runIt(ThreadPoolRunnable toRun) {
this.toRun = toRun;
shouldRun = true;
this.notify();
}
public void stop() {
this.terminate();
}
public void kill() {
t.stop();
}
public synchronized void terminate() {
shouldTerminate = true;
this.notify();
}
}
public String getName() {
return name;
}
public boolean getDaemon() {
return isDaemon;
}
public int getThreadPriority() {
return threadPriority;
}
public int incSequence() {
return sequence ++;
}
public void setThreadPriority(int threadPriority) {
this.threadPriority = threadPriority;
}
}
  ThreadWithAttributes.java
  


package com.xiao.tomcatthreadpool;
import java.util.Hashtable;
public class ThreadWithAttributes extends Thread {
private Object control;
public static int MAX_NOTES=16;
private Object notes[]=new Object[MAX_NOTES];
private Hashtable attributes=new Hashtable();
private String currentStage;
private Object param;
private Object thData[];
public ThreadWithAttributes(Object control, Runnable r) {
super(r);
this.control=control;
}
public final Object[] getThreadData(Object control ) {
return thData;
}
public final void setThreadData(Object control, Object thData[] ) {
this.thData=thData;
}
public final void setNote( Object control, int id, Object value ) {
if( this.control != control ) return;
notes[id]=value;
}
public final String getCurrentStage(Object control) {
if( this.control != control ) return null;
return currentStage;
}
/** Information about the current request ( or the main object
* we are processing )
*/
public final Object getParam(Object control) {
if( this.control != control ) return null;
return param;
}
public final void setCurrentStage(Object control, String currentStage) {
if( this.control != control ) return;
this.currentStage = currentStage;
}
public final void setParam( Object control, Object param ) {
if( this.control != control ) return;
this.param=param;
}
public final Object getNote(Object control, int id ) {
if( this.control != control ) return null;
return notes[id];
}
public final Hashtable getAttributes(Object control) {
if( this.control != control ) return null;
return attributes;
}
}

  ThreadPoolRunnable.java
  


package com.xiao.tomcatthreadpool;
public interface ThreadPoolRunnable {
public Object[] getInitData();
public void runIt(Object thData[]);
}
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-333749-1-1.html 上篇帖子: Tomcat源码---请求处理二 下篇帖子: Spirng Tomcat ActiveMQ JMS
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表