设为首页 收藏本站
查看: 644|回复: 0

[经验分享] 线程池--jetty中QueuedThreadPool分析(一)

[复制链接]

尚未签到

发表于 2017-2-27 08:26:45 | 显示全部楼层 |阅读模式
  jetty版本:jetty-6.1.26
  1.由于jetty中的许多组件都实现了LifeCycle接口,先了解下该接口的定义:

package org.mortbay.component;
import java.util.EventListener;
public interface LifeCycle
{
public void start()  throws Exception;
public void stop()   throws Exception;
public boolean isRunning();  
public boolean isStarted();   
public boolean isStarting();
public boolean isStopping();   
public boolean isStopped();
public boolean isFailed();
public void addLifeCycleListener(LifeCycle.Listener listener);
public void removeLifeCycleListener(LifeCycle.Listener listener);

/* ------------------------------------------------------------ */
/** Listener.
* A listener for Lifecycle events.
*/
public interface Listener extends EventListener
{
public void lifeCycleStarting(LifeCycle event);
public void lifeCycleStarted(LifeCycle event);
public void lifeCycleFailure(LifeCycle event,Throwable cause);
public void lifeCycleStopping(LifeCycle event);
public void lifeCycleStopped(LifeCycle event);
}
}
  2.AbstractLifeCycle的抽象类,该类实现了LifeCycle接口(其中start()和stop()两个方法在类中采用模板模式实现):

//========================================================================
//$Id: AbstractLifeCycle.java,v 1.3 2005/11/11 22:55:41 gregwilkins Exp $
//Copyright 2004-2005 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//http://www.apache.org/licenses/LICENSE-2.0
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
//========================================================================
package org.mortbay.component;
import org.mortbay.log.Log;
import org.mortbay.util.LazyList;
/**
* Basic implementation of the life cycle interface for components.
*
* @author gregw
*/
public abstract class AbstractLifeCycle implements LifeCycle
{
private Object _lock = new Object();
private final int FAILED = -1, STOPPED = 0, STARTING = 1, STARTED = 2, STOPPING = 3;
private volatile int _state = STOPPED;
protected LifeCycle.Listener[] _listeners;
protected void doStart() throws Exception
{
}
protected void doStop() throws Exception
{
}
public final void start() throws Exception
{
synchronized (_lock)
{
try
{
if (_state == STARTED || _state == STARTING)
return;
setStarting();
doStart();
Log.debug("started {}",this);
setStarted();
}
catch (Exception e)
{
setFailed(e);
throw e;
}
catch (Error e)
{
setFailed(e);
throw e;
}
}
}
public final void stop() throws Exception
{
synchronized (_lock)
{
try
{
if (_state == STOPPING || _state == STOPPED)
return;
setStopping();
doStop();
Log.debug("stopped {}",this);
setStopped();
}
catch (Exception e)
{
setFailed(e);
throw e;
}
catch (Error e)
{
setFailed(e);
throw e;
}
}
}
public boolean isRunning()
{
return _state == STARTED || _state == STARTING;
}
public boolean isStarted()
{
return _state == STARTED;
}
public boolean isStarting()
{
return _state == STARTING;
}
public boolean isStopping()
{
return _state == STOPPING;
}
public boolean isStopped()
{
return _state == STOPPED;
}
public boolean isFailed()
{
return _state == FAILED;
}
public void addLifeCycleListener(LifeCycle.Listener listener)
{
_listeners = (LifeCycle.Listener[])LazyList.addToArray(_listeners,listener,LifeCycle.Listener.class);
}
public void removeLifeCycleListener(LifeCycle.Listener listener)
{
_listeners = (LifeCycle.Listener[])LazyList.removeFromArray(_listeners,listener);
}
private void setStarted()
{
_state = STARTED;
if (_listeners != null)
{
for (int i = 0; i < _listeners.length; i++)
{
_listeners.lifeCycleStarted(this);
}
}
}
private void setStarting()
{
_state = STARTING;
if (_listeners != null)
{
for (int i = 0; i < _listeners.length; i++)
{
_listeners.lifeCycleStarting(this);
}
}
}
private void setStopping()
{
_state = STOPPING;
if (_listeners != null)
{
for (int i = 0; i < _listeners.length; i++)
{
_listeners.lifeCycleStopping(this);
}
}
}
private void setStopped()
{
_state = STOPPED;
if (_listeners != null)
{
for (int i = 0; i < _listeners.length; i++)
{
_listeners.lifeCycleStopped(this);
}
}
}
private void setFailed(Throwable th)
{
Log.warn("failed "+this+": "+th);
Log.debug(th);
_state = FAILED;
if (_listeners != null)
{
for (int i = 0; i < _listeners.length; i++)
{
_listeners.lifeCycleFailure(this,th);
}
}
}
}

  3.QueuedThreadPool的实现(在jetty7中该类采用了concurrent包中的许多特性,有空可以对比分析下)。
  其中主要的方法为:doStart(),doStop(),newThread(),dispatch(),以及内部类PoolThread的run()和dispatch()方法。

// ========================================================================
// Copyright 2004-2005 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ========================================================================
package org.mortbay.thread;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.mortbay.component.AbstractLifeCycle;
import org.mortbay.log.Log;
/* ------------------------------------------------------------ */
/** A pool of threads.
* <p>
* Avoids the expense of thread creation by pooling threads after
* their run methods exit for reuse.
* <p>
* If an idle thread is available a job is directly dispatched,
* otherwise the job is queued.  After queuing a job, if the total
* number of threads is less than the maximum pool size, a new thread
* is spawned.
* <p>
*
* @author Greg Wilkins <gregw@mortbay.com>
*/
public class QueuedThreadPool extends AbstractLifeCycle implements Serializable, ThreadPool
{
private String _name;
private Set _threads;//线程池里的所有poolThread
private List _idle;//空闲的poolThread
private Runnable[] _jobs;//等待执行的job(即:工作队列)
private int _nextJob;//工作队列中下一个出队的位置
private int _nextJobSlot;//工作队列中下一个入队的位置
private int _queued;//工作队列的实际长度
private int _maxQueued;
private boolean _daemon;
private int _id;
private final Object _lock = new Lock();//工作队列_jobs和空闲线程_idle队列的锁
private final Object _threadsLock = new Lock();//线程池所有线程_threads的锁
private final Object _joinLock = new Lock();//
private long _lastShrink;
private int _maxIdleTimeMs=60000;
private int _maxThreads=250;
private int _minThreads=2;
private boolean _warned=false;
private int _lowThreads=0;
private int _priority= Thread.NORM_PRIORITY;
private int _spawnOrShrinkAt=0;
private int _maxStopTimeMs;

/* ------------------------------------------------------------------- */
/* Construct
*/
public QueuedThreadPool()
{
_name="qtp-"+hashCode();
}
/* ------------------------------------------------------------------- */
/* Construct
*/
public QueuedThreadPool(int maxThreads)
{
this();
setMaxThreads(maxThreads);
}
/* ------------------------------------------------------------ */
/** Run job.
* @return true
*/
public boolean dispatch(Runnable job)
{  
if (!isRunning() || job==null)
return false;
PoolThread thread=null;
boolean spawn=false;
synchronized(_lock)
{
// Look for an idle thread
int idle=_idle.size();
if (idle>0)
thread=(PoolThread)_idle.remove(idle-1);
else
{
// queue the job
_queued++;//初始值为0
if (_queued>_maxQueued)//当入列的job数大于最大队列数时,更新最大队列数为当前入列个数.
_maxQueued=_queued;
_jobs[_nextJobSlot++]=job;//_jobs[0]=job;_nextJobSlot = 1; _nextJobSlot表示下一个可以插入_jobs队列的位置。
if (_nextJobSlot==_jobs.length)//
_nextJobSlot=0;
if (_nextJobSlot==_nextJob)//_nextJob表示当前_jobs队列第一个可用的job的位置。_jobs队列已满时,重新扩容(倍增)。
{
// Grow the job queue
Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];//jobs队列倍增
int split=_jobs.length-_nextJob;
if (split>0)
System.arraycopy(_jobs,_nextJob,jobs,0,split);
if (_nextJob!=0)
System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
_jobs=jobs;
_nextJob=0;
_nextJobSlot=_queued;
}
spawn=_queued>_spawnOrShrinkAt;
}
}
if (thread!=null)
{
thread.dispatch(job);
}
else if (spawn)
{
newThread();
}
return true;
}
/* ------------------------------------------------------------ */
/** Get the number of idle threads in the pool.
* @see #getThreads
* @return Number of threads
*/
public int getIdleThreads()
{
return _idle==null?0:_idle.size();
}
/* ------------------------------------------------------------ */
/**
* @return low resource threads threshhold
*/
public int getLowThreads()
{
return _lowThreads;
}
/* ------------------------------------------------------------ */
/**
* @return maximum queue size
*/
public int getMaxQueued()
{
return _maxQueued;
}
/* ------------------------------------------------------------ */
/** Get the maximum thread idle time.
* Delegated to the named or anonymous Pool.
* @see #setMaxIdleTimeMs
* @return Max idle time in ms.
*/
public int getMaxIdleTimeMs()
{
return _maxIdleTimeMs;
}
/* ------------------------------------------------------------ */
/** Set the maximum number of threads.
* Delegated to the named or anonymous Pool.
* @see #setMaxThreads
* @return maximum number of threads.
*/
public int getMaxThreads()
{
return _maxThreads;
}
/* ------------------------------------------------------------ */
/** Get the minimum number of threads.
* Delegated to the named or anonymous Pool.
* @see #setMinThreads
* @return minimum number of threads.
*/
public int getMinThreads()
{
return _minThreads;
}
/* ------------------------------------------------------------ */
/**
* @return The name of the BoundedThreadPool.
*/
public String getName()
{
return _name;
}
/* ------------------------------------------------------------ */
/** Get the number of threads in the pool.
* @see #getIdleThreads
* @return Number of threads
*/
public int getThreads()
{
return _threads.size();
}
/* ------------------------------------------------------------ */
/** Get the priority of the pool threads.
*  @return the priority of the pool threads.
*/
public int getThreadsPriority()
{
return _priority;
}
/* ------------------------------------------------------------ */
public int getQueueSize()
{
return _queued;
}
/* ------------------------------------------------------------ */
/**
* @return the spawnOrShrinkAt  The number of queued jobs (or idle threads) needed
* before the thread pool is grown (or shrunk)
*/
public int getSpawnOrShrinkAt()
{
return _spawnOrShrinkAt;
}
/* ------------------------------------------------------------ */
/**
* @param spawnOrShrinkAt The number of queued jobs (or idle threads) needed
* before the thread pool is grown (or shrunk)
*/
public void setSpawnOrShrinkAt(int spawnOrShrinkAt)
{
_spawnOrShrinkAt=spawnOrShrinkAt;
}
/* ------------------------------------------------------------ */
/**
* @return maximum total time that stop() will wait for threads to die.
*/
public int getMaxStopTimeMs()
{
return _maxStopTimeMs;
}
/* ------------------------------------------------------------ */
/**
* @param stopTimeMs maximum total time that stop() will wait for threads to die.
*/
public void setMaxStopTimeMs(int stopTimeMs)
{
_maxStopTimeMs = stopTimeMs;
}
/* ------------------------------------------------------------ */
/**
* Delegated to the named or anonymous Pool.
*/
public boolean isDaemon()
{
return _daemon;
}
/* ------------------------------------------------------------ */
public boolean isLowOnThreads()
{
return _queued>_lowThreads;
}
/* ------------------------------------------------------------ */
public void join() throws InterruptedException
{
synchronized (_joinLock)
{
while (isRunning()){
_joinLock.wait();
}
}
// TODO remove this semi busy loop!
while (isStopping()){
Thread.sleep(100);
}
}
/* ------------------------------------------------------------ */
/**
* Delegated to the named or anonymous Pool.
*/
public void setDaemon(boolean daemon)
{
_daemon=daemon;
}
/* ------------------------------------------------------------ */
/**
* @param lowThreads low resource threads threshhold
*/
public void setLowThreads(int lowThreads)
{
_lowThreads = lowThreads;
}
/* ------------------------------------------------------------ */
/** Set the maximum thread idle time.
* Threads that are idle for longer than this period may be
* stopped.
* Delegated to the named or anonymous Pool.
* @see #getMaxIdleTimeMs
* @param maxIdleTimeMs Max idle time in ms.
*/
public void setMaxIdleTimeMs(int maxIdleTimeMs)
{
_maxIdleTimeMs=maxIdleTimeMs;
}
/* ------------------------------------------------------------ */
/** Set the maximum number of threads.
* Delegated to the named or anonymous Pool.
* @see #getMaxThreads
* @param maxThreads maximum number of threads.
*/
public void setMaxThreads(int maxThreads)
{
if (isStarted() && maxThreads<_minThreads)
throw new IllegalArgumentException("!minThreads<maxThreads");
_maxThreads=maxThreads;
}
/* ------------------------------------------------------------ */
/** Set the minimum number of threads.
* Delegated to the named or anonymous Pool.
* @see #getMinThreads
* @param minThreads minimum number of threads
*/
public void setMinThreads(int minThreads)
{
if (isStarted() && (minThreads<=0 || minThreads>_maxThreads))
throw new IllegalArgumentException("!0<=minThreads<maxThreads");
_minThreads=minThreads;
synchronized (_threadsLock)
{
while (isStarted() && _threads.size()<_minThreads)
{
newThread();   
}
}
}
/* ------------------------------------------------------------ */
/**
* @param name Name of the BoundedThreadPool to use when naming Threads.
*/
public void setName(String name)
{
_name= name;
}
/* ------------------------------------------------------------ */
/** Set the priority of the pool threads.
*  @param priority the new thread priority.
*/
public void setThreadsPriority(int priority)
{
_priority=priority;
}
/* ------------------------------------------------------------ */
/* Start the BoundedThreadPool.
* Construct the minimum number of threads.
*/
protected void doStart() throws Exception
{
if (_maxThreads<_minThreads || _minThreads<=0)
throw new IllegalArgumentException("!0<minThreads<maxThreads");
_threads=new HashSet();
_idle=new ArrayList();
_jobs=new Runnable[_maxThreads];//按照最大线程数创建的工作队列
for (int i=0;i<_minThreads;i++)//按最小线程数创建的poolThread
{
newThread();
}   
}
/* ------------------------------------------------------------ */
/** Stop the BoundedThreadPool.
* New jobs are no longer accepted,idle threads are interrupted
* and stopJob is called on active threads.
* The method then waits
* min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
* stop, at which time killJob is called.
*/
protected void doStop() throws Exception
{   
super.doStop();
long start=System.currentTimeMillis();
for (int i=0;i<100;i++)
{
synchronized (_threadsLock)
{
Iterator iter = _threads.iterator();
while (iter.hasNext())
((Thread)iter.next()).interrupt();
}
Thread.yield();
if (_threads.size()==0 || (_maxStopTimeMs>0 && _maxStopTimeMs < (System.currentTimeMillis()-start)))
break;
try
{
Thread.sleep(i*100);
}
catch(InterruptedException e){}

}
// TODO perhaps force stops
if (_threads.size()>0)
Log.warn(_threads.size()+" threads could not be stopped");
synchronized (_joinLock)
{
_joinLock.notifyAll();
}
}
/* ------------------------------------------------------------ */
protected void newThread()
{
synchronized (_threadsLock)
{
if (_threads.size()<_maxThreads)
{
PoolThread thread =new PoolThread();
_threads.add(thread);//添加到线程池中
thread.setName(thread.hashCode()+"@"+_name+"-"+_id++);//线程name = 线程hashCode + @ + 线程池名字  + 线程在线程池中的序号(递增)
thread.start();
}
else if (!_warned)   
{
_warned=true;
Log.debug("Max threads for {}",this);
}
}
}
/* ------------------------------------------------------------ */
/** Stop a Job.
* This method is called by the Pool if a job needs to be stopped.
* The default implementation does nothing and should be extended by a
* derived thread pool class if special action is required.
* @param thread The thread allocated to the job, or null if no thread allocated.
* @param job The job object passed to run.
*/
protected void stopJob(Thread thread, Object job)
{
thread.interrupt();
}

/* ------------------------------------------------------------ */
public String dump()
{
StringBuffer buf = new StringBuffer();
synchronized (_threadsLock)
{
for (Iterator i=_threads.iterator();i.hasNext();)
{
Thread thread = (Thread)i.next();
buf.append(thread.getName()).append(" ").append(thread.toString()).append('\n');
}
}
return buf.toString();
}
/* ------------------------------------------------------------ */
/**
* @param name The thread name to stop.
* @return true if the thread was found and stopped.
* @Deprecated Use {@link #interruptThread(long)} in preference
*/
public boolean stopThread(String name)
{
synchronized (_threadsLock)
{
for (Iterator i=_threads.iterator();i.hasNext();)
{
Thread thread = (Thread)i.next();
if (name.equals(thread.getName()))
{
thread.stop();
return true;
}
}
}
return false;
}
/* ------------------------------------------------------------ */
/**
* @param name The thread name to interrupt.
* @return true if the thread was found and interrupted.
*/
public boolean interruptThread(String name)
{
synchronized (_threadsLock)
{
for (Iterator i=_threads.iterator();i.hasNext();)
{
Thread thread = (Thread)i.next();
if (name.equals(thread.getName()))
{
thread.interrupt();
return true;
}
}
}
return false;
}
/* ------------------------------------------------------------ */
/** Pool Thread class.
* The PoolThread allows the threads job to be
* retrieved and active status to be indicated.
*/
public class PoolThread extends Thread
{
Runnable _job=null;//线程池通过内部类的成员变量进行交互
/* ------------------------------------------------------------ */
PoolThread()
{
setDaemon(_daemon);
setPriority(_priority);
}
/* ------------------------------------------------------------ */
/** BoundedThreadPool run.
* Loop getting jobs and handling them until idle or stopped.
*/
public void run()
{
boolean idle=false;//
Runnable job=null;//独立于线程
try
{
while (isRunning())
{   
// Run any job that we have.
if (job!=null)
{
final Runnable todo=job;
job=null;
idle=false;
todo.run();
}
synchronized(_lock)
{
// is there a queued job?
if (_queued>0)
{
_queued--;
job=_jobs[_nextJob];
_jobs[_nextJob++]=null;
if (_nextJob==_jobs.length)
_nextJob=0;
continue;
}
// Should we shrink?
final int threads=_threads.size();
if (threads>_minThreads &&
(threads>_maxThreads ||
_idle.size()>_spawnOrShrinkAt))   
{
long now = System.currentTimeMillis();
if ((now-_lastShrink)>getMaxIdleTimeMs())
{
_lastShrink=now;
_idle.remove(this);
return;
}
}
if (!idle)
{   
// Add ourselves to the idle set.
_idle.add(this);
idle=true;
}
}
// We are idle
// wait for a dispatched job
synchronized (this)
{
if (_job==null)
this.wait(getMaxIdleTimeMs());
job=_job;
_job=null;
}
}
}
catch (InterruptedException e)
{
Log.ignore(e);
}
finally
{
synchronized (_lock)
{
_idle.remove(this);
}
synchronized (_threadsLock)
{
_threads.remove(this);
}
synchronized (this)
{
job=_job;
}
// we died with a job! reschedule it
if (job!=null)
{
//此处是因为内部类和外部类有同名的方法,否则,直接调用即可.
//调用外部类对象(线程池)的dispatch()方法,而不是内部类对象(PoolThread线程)的dispatch()方法
QueuedThreadPool.this.dispatch(job);
}
}
}
/* ------------------------------------------------------------ */
void dispatch(Runnable job)
{
synchronized (this)
{
_job=job;
this.notify();
}
}
}
private class Lock{}
}

  4.QueuedThreadPoolTest测试类,调用QueuedThreadPool对象的start()方法启动线程池,调用dispatch()方法分发任务。

package com.iteye.suo.jetty.thread;
import org.mortbay.thread.QueuedThreadPool;
public class QueuedThreadPoolTest {
public static void main(String[] args) throws Exception {
QueuedThreadPool pool = new QueuedThreadPool();
pool.start();
for(int i=0;i<20;i++){
final int num = i;
pool.dispatch(new Runnable(){
public void run() {
System.out.println(Thread.currentThread().getName() + " loop of " + num);
}
});
}
System.out.println("done!");
//如何停止pool?这样停止的话,若线程池里有任务,将会被中断。
pool.stop();
}
}

 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-347644-1-1.html 上篇帖子: Jetty嵌入式Web容器的开发--基础开发 (转) 下篇帖子: eclipse中配置使用jetty调试web项目
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表