|
本文接上一编文章《大数据框架hadoop的作业提交过程》。调度器调用JobTracker.initJob()函数对新作业进行初始化。相关代码如下:
// 调度器调用eagerTaskInitializationListener.start()方法。
class JobQueueTaskScheduler extends TaskScheduler {
@Override
public synchronized void start() throws IOException {
super.start();
... ...
eagerTaskInitializationListener.start();
... ...
}
}
// EagerTaskInitializationListener.start()方法启动作业管理器线程。
class EagerTaskInitializationListener extends JobInProgressListener {
... ...
public void start() throws IOException {
this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
... ...
this.jobInitManagerThread.start();
}
... ...
}
// 作业初始化管理器执行作业初始化动作。
class JobInitManager implements Runnable {
public void run() {
... ...
threadPool.execute(new InitJob(job));
... ...
}
}
| 作业初始化的主要工作是构造Map Task和Reduce Task并对它们进行初始化。
Hadoop将每个作业分解成4种类型的任务,分别是Setup Task、Map Task、Reduce Task和Cleanup Task。它们的运行时信息由TaskInProgress类维护,因此,创建这些任务实际上是创建TaskInProgress对象。
上述4种任务的作用及创建过程如下。
n Setup Task:作业初始化标识性任务。它进行一些非常简单的作业初始化工作,比如将运行状态设置为“setup”,调用OutputCommitter.setupJob()函数等。该任务运行完后,作业由PREP状态变为RUNNING状态,并开始运行Map Task。该类型任务又被分为Map Setup Task和Reduce Setup Task两种,且每个作业各有一个。它们运行时分别占用一个Map slot和Reduce slot。由于这两种任务功能相同,因此有且只有一个可以获得运行的机会(即只要有一个开始运行,另一个马上被杀掉,而具体哪一个能够运行,取决于当时存在的空闲slot种类及调度策略。相关代码如下:
public class JobInProgress {
TaskInProgress setup[] = new TaskInProgress[0];
... ...
public synchronized void initTasks() {
... ...
// create two setup tips, one map and one reduce.
setup = new TaskInProgress[2];
// setup map tip. This map doesn't use any split. Just assign an empty
// split.
setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks + 1, 1);
setup[0].setJobSetupTask();
// setup reduce tip.
setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this, 1);
setup[1].setJobSetupTask();
... ...
}
}
|
n Map Task:Map阶段处理数据的任务。其数目及对应的处理数据分片由应用程序中的
InputFormat组件确定。关代码如下:
public class JobInProgress {
TaskInProgress maps[] = new TaskInProgress[0];
... ...
public synchronized void initTasks() {
// read input splits and create a map per a split
TaskSplitMetaInfo[] splits = createSplits(jobId);
numMapTasks = splits.length;
... ...
maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits[i].getInputDataLength();
maps[i] = new TaskInProgress(jobId, jobFile, splits[i],
jobtracker, conf, this, i, numSlotsPerMap);
}
... ...
}
}
| n Reduce Task:Reduce阶段处理数据的任务。其数目由用户通过参数mapred.reduce.tasks(默认数目为1)指定。考虑到Reduce Task能否运行依赖于Map Task的输出结果,因此,Hadoop刚开始只会调度Map Task,直到Map Task完成数目达到一定比例(由参数mapred.reduce.slowstart.completed.maps指定,默认是0.05,即5%)后,才开始调度Reduce Task。关代码如下:
public class JobInProgress {
TaskInProgress reduces[] = new TaskInProgress[0];
... ...
public synchronized void initTasks() {
... ...
// Create reduce tasks
this.reduces = new TaskInProgress[numReduceTasks];
for (int i = 0; i < numReduceTasks; i++) {
reduces[i] = new TaskInProgress(jobId, jobFile, numMapTasks, i,
jobtracker, conf, this, numSlotsPerReduce);
nonRunningReduces.add(reduces[i]);
}
... ...
}
| n Cleanup Task:作业结束标志性任务,主要完成一些清理工作,比如删除作业运行过程中用到的一些临时目录(比如_temporary目录)。一旦该任务运行成功后,作业由RUNNING状态变为SUCCESSED状态。关代码如下:
public class JobInProgress {
TaskInProgress cleanup[] = new TaskInProgress[0];
... ...
public synchronized void initTasks() {
... ...
// create cleanup two cleanup tips, one map and one reduce.
cleanup = new TaskInProgress[2];
// cleanup map tip. This map doesn't use any splits. Just assign an empty
// split.
TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks, 1);
cleanup[0].setJobCleanupTask();
// cleanup reduce tip.
cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this, 1);
cleanup[1].setJobCleanupTask();
... ...
}
| |
|
|