public boolean dispatch(Runnable job)
{
if (!isRunning() || job==null)
return false;
PoolThread thread=null;
boolean spawn=false;
synchronized(_lock)
{
// Look for an idle thread
int idle=_idle.size();
if (idle>0) //有空闲线程,直接分配
thread=(PoolThread)_idle.remove(idle-1);
else
{
// queue the job
_queued++;
if (_queued>_maxQueued)
_maxQueued=_queued;
_jobs[_nextJobSlot++]=job;
if (_nextJobSlot==_jobs.length)
_nextJobSlot=0;
if (_nextJobSlot==_nextJob) //环形队列_jobs已满,需要调整大小
{
//在原有基础上增加_maxThreads
Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
int split=_jobs.length-_nextJob;
if (split>0)
System.arraycopy(_jobs,_nextJob,jobs,0,split);
if (_nextJob!=0)
System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
_jobs=jobs;
_nextJob=0;
_nextJobSlot=_queued;
}
// 队列任务数超过阀值,在可能的情况下,增加工作线程数目
spawn=_queued>_spawnOrShrinkAt;
}
}
if (thread!=null)
{
thread.dispatch(job);
}
else if (spawn)
{
newThread();
}
return true;
} PoolThread
上面说了,一旦没有空闲线程,任务就仅仅是加入到_jobs队列中,那么负责从队列中取任务就是PoolThread的事了。
PoolThread在自己的任务完成之后会自觉的到_jobs环形队列中领取任务。有任务,执行之;没有,把自己加入_idle中,等待_maxIdleTimeMs的时间,或QueuedThreadPool的唤醒(调用PoolThread的dispatch(Runnable job)方法)。
当线程处于空闲状态,并发现当前空闲线程数多于_spawnOrShrinkAt阀值时,它就得考虑要不要自我了断了。条件就是距离上次缩减(shrink)空闲线程时间超过_maxIdleTimeMs。
public void run()
{
boolean idle=false;
Runnable job=null;
try
{
while (isRunning())
{
// Run any job that we have.
if (job!=null)
{
final Runnable todo=job;
job=null;
idle=false;
todo.run();
}
synchronized(_lock)
{
// is there a queued job?
if (_queued>0)
{
_queued--;
job=_jobs[_nextJob];
_jobs[_nextJob++]=null;
if (_nextJob==_jobs.length)
_nextJob=0;
continue;
}
// Should we shrink?
final int threads=_threads.size();
// 我不知道为什么_threads.size()会有可能大于_maxThreads
// 知道的兄弟请告诉我!!
if (threads>_minThreads &&
(threads>_maxThreads ||
_idle.size()>_spawnOrShrinkAt))
{
long now = System.currentTimeMillis();
if ((now-_lastShrink)>getMaxIdleTimeMs())
{
_lastShrink=now;
_idle.remove(this);
return;
}
}
if (!idle)
{
// Add ourselves to the idle set.
_idle.add(this);
idle=true;
}
}
// We are idle
// wait for a dispatched job
synchronized (this)
{
if (_job==null)
this.wait(getMaxIdleTimeMs());
job=_job;
_job=null;
}
}
}
catch (InterruptedException e)
{
Log.ignore(e);
}
finally
{
synchronized (_lock)
{
_idle.remove(this);
}
synchronized (_threadsLock)
{
_threads.remove(this);
}
synchronized (this)
{
job=_job;
}
// we died with a job! reschedule it
if (job!=null)
{
QueuedThreadPool.this.dispatch(job);
}
}
}
参考资料:
http://suo.iteye.com/blog/1390134