设为首页 收藏本站
查看: 631|回复: 0

[经验分享] jetty中的QueuedThreadPool

[复制链接]

尚未签到

发表于 2017-2-26 11:08:54 | 显示全部楼层 |阅读模式
  QueuedThreadPool


  doStart()方法负责初始化并启动_minThreads个空线程,等待任务的到来。其中,
  _theads用来存放当前池中所有线程;
  _idle中存放当前空闲线程;
  _jobs为存放任务的队列,先来先服务。

protected void doStart() throws Exception
{
if (_maxThreads<_minThreads || _minThreads<=0)
throw new IllegalArgumentException("!0<minThreads<maxThreads");
_threads=new HashSet();
_idle=new ArrayList();
_jobs=new Runnable[_maxThreads];
for (int i=0;i<_minThreads;i++)
{
newThread();
}   
}
protected void newThread()
{
synchronized (_threadsLock)
{
if (_threads.size()<_maxThreads)
{
PoolThread thread =new PoolThread();
_threads.add(thread);
thread.setName(thread.hashCode()+"@"+_name+"-"+_id++);
thread.start();
}
else if (!_warned)   
{
_warned=true;
Log.debug("Max threads for {}",this);
}
}
}
  初始化完毕,可调用dispatch(Runnable job)方法给线程池分配任务。
  如果_idle.size() > 0,即有空闲线程,直接把job分给最后一个空闲线程,并从_idle中删除之。至于为什么是给最后一个?没有为什么,只要你愿意,随便哪一个都行。
  如果_idle.size() = 0,即没有空闲线程,那就只好到_jobs中排队啦。
  _jobs的示意图如下图,这是一初始大小为5的队列。
  
DSC0000.png
  
_nextJob指向_jobs队列中当前可取任务地址,每取一次任务,_nextJob自增1;而_nextJobSlot指向当前可以放置任务的地址,也就是下一个任务进队列,要放在哪一个位置,每进一个任务,_nextJobSlot自增1。它们初始位置当然都是0。
  一般来说,当_nextJobSlot总是处在_nextJob前面的,即_nextJobSlot >_nextJob。
当_nextJob == _nextJobSlot时,就说明_jobs队列排满了,没位置了,怎么办?增加队列长度呗。QueuedThreadPool中,每调整一次,增加_maxThreads个空位置。 调整的过程中还涉及到数据的复制和两个指针位置的调整。
  一旦发现,队列中的任务数_queued大于_spawnOrShrinkAt这个阀值,说明任务太多,当前线程数偏少,我们需要增加更多的工作线程来执行任务,最多_maxThreads个。

public boolean dispatch(Runnable job)
{  
if (!isRunning() || job==null)
return false;
PoolThread thread=null;
boolean spawn=false;
synchronized(_lock)
{
// Look for an idle thread
int idle=_idle.size();
if (idle>0) //有空闲线程,直接分配
thread=(PoolThread)_idle.remove(idle-1);
else
{
// queue the job
_queued++;
if (_queued>_maxQueued)
_maxQueued=_queued;
_jobs[_nextJobSlot++]=job;
if (_nextJobSlot==_jobs.length)
_nextJobSlot=0;
if (_nextJobSlot==_nextJob) //环形队列_jobs已满,需要调整大小
{
//在原有基础上增加_maxThreads
Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
int split=_jobs.length-_nextJob;
if (split>0)
System.arraycopy(_jobs,_nextJob,jobs,0,split);
if (_nextJob!=0)
System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
_jobs=jobs;
_nextJob=0;
_nextJobSlot=_queued;
}
// 队列任务数超过阀值,在可能的情况下,增加工作线程数目  
spawn=_queued>_spawnOrShrinkAt;
}
}
if (thread!=null)
{
thread.dispatch(job);
}
else if (spawn)
{
newThread();
}
return true;
}
  PoolThread
  上面说了,一旦没有空闲线程,任务就仅仅是加入到_jobs队列中,那么负责从队列中取任务就是PoolThread的事了。
  PoolThread在自己的任务完成之后会自觉的到_jobs环形队列中领取任务。有任务,执行之;没有,把自己加入_idle中,等待_maxIdleTimeMs的时间,或QueuedThreadPool的唤醒(调用PoolThread的dispatch(Runnable job)方法)。
  当线程处于空闲状态,并发现当前空闲线程数多于_spawnOrShrinkAt阀值时,它就得考虑要不要自我了断了。条件就是距离上次缩减(shrink)空闲线程时间超过_maxIdleTimeMs。

public void run()
{
boolean idle=false;
Runnable job=null;
try
{
while (isRunning())
{   
// Run any job that we have.
if (job!=null)
{
final Runnable todo=job;
job=null;
idle=false;
todo.run();
}
synchronized(_lock)
{
// is there a queued job?
if (_queued>0)
{
_queued--;
job=_jobs[_nextJob];
_jobs[_nextJob++]=null;
if (_nextJob==_jobs.length)
_nextJob=0;
continue;
}
// Should we shrink?
final int threads=_threads.size();
// 我不知道为什么_threads.size()会有可能大于_maxThreads
// 知道的兄弟请告诉我!!
if (threads>_minThreads &&
(threads>_maxThreads ||
_idle.size()>_spawnOrShrinkAt))   
{
long now = System.currentTimeMillis();
if ((now-_lastShrink)>getMaxIdleTimeMs())
{
_lastShrink=now;
_idle.remove(this);
return;
}
}
if (!idle)
{   
// Add ourselves to the idle set.
_idle.add(this);
idle=true;
}
}
// We are idle
// wait for a dispatched job
synchronized (this)
{
if (_job==null)
this.wait(getMaxIdleTimeMs());
job=_job;
_job=null;
}
}
}
catch (InterruptedException e)
{
Log.ignore(e);
}
finally
{
synchronized (_lock)
{
_idle.remove(this);
}
synchronized (_threadsLock)
{
_threads.remove(this);
}
synchronized (this)
{
job=_job;
}
// we died with a job! reschedule it
if (job!=null)
{
QueuedThreadPool.this.dispatch(job);
}
}
}
  参考资料:
  http://suo.iteye.com/blog/1390134

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-347380-1-1.html 上篇帖子: eclipse jee配置jetty的两种方法 下篇帖子: Jetty架构分析
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表