设为首页 收藏本站
查看: 1899|回复: 0

[经验分享] Hadoop MapReduce 任务执行流程源代码详细解析(转载)

[复制链接]

尚未签到

发表于 2015-7-14 09:16:14 | 显示全部楼层 |阅读模式
目录

1 引言
   1.1 目的

1.2 读者范围

2 综述

3 代码详细分析

3.1 启动Hadoop集群

3.2 JobTracker启动以及Job的初始化

3.3 TaskTracker启动以及发送Heartbeat

3.4 JobTracker接收Heartbeat并向TaskTracker分配任务

3.5 TaskTracker接收HeartbeatResponse

3.6 MapReduce任务的运行

3.6.1 MapTask的运行

3.6.2 ReduceTask的运行

4 致谢
  





1 引言

1.1 目的
  该文档从源代码的级别剖析了Hadoop0.20.2版本的MapReduce模块的运行原理和流程,对JobTracker、 TaskTracker的内部结构和交互流程做了详细介绍。系统地分析了Map程序和Reduce程序运行的原理。读者在阅读之后会对Hadoop MapReduce0.20.2版本源代码有一个大致的认识。

1.2 读者范围
  如果读者想只是想从原理上更加深入了解Hadoop MapReduce运行机制的话,只需要阅读第2章综述即可,该章节要求读者对HadoopMapReduce模型有系统的了解。
  如果读者想深入了解HadoopMapReduce的源代码,则需阅读该文档第2、3节。阅读第3节需要读者熟练掌握Java语言的基本语法,并且对反射机制、动态代理有一定的了解。同时,还要求读者对于Hadoop HDFS和Hadoop RPC的基本用法有一定的了解。
  另外,熟悉Hadoop源代码的最好方法是远程调试,有关远程调试的方法请读者去网上自行查阅资料。



2 综述
  Hadoop源代码分为三大模块:MapReduce、HDFS和Hadoop Common。其中MapReduce模块主要实现了MapReduce模型的相关功能;HDFS模块主要实现了HDFS的相关功能;而Hadoop Common主要实现了一些基础功能,比如说RPC、网络通信等。
  在用户使用HadoopMapReduce模型进行并行计算时,用户只需要写好Map函数、Reduce函数,之后调用JobClient将Job 提交即可。在JobTracker收到提交的Job之后,便会对Job进行一系列的配置,然后交给TaskTracker进行执行。执行完毕之后,JobTracker会通知JobClient任务完成,并将结果存入HDFS中。
DSC0000.gif
  如图所示,用户提交Job是通过JobClient类的submitJob()函数实现的。在Hadoop源代码中,一个被提交了的Job由 JobInProgress类的一个实例表示。该类封装了表示Job的各种信息,以及Job所需要执行的各种动作。在调用submitJob()函数之后,JobTracker会将作业加入到一个队列中去,这个队列的名字叫做jobInitQueue。然后,在JobTracker中,有一个名为 JobQueueTaskScheduler的对象,会不断轮询jobInitQueue队列,一旦发现有新的Job加入,便将其取出,然后将其初始化。
  在Hadoop代码中,一个Task由一个TaskInProgress类的实例表示。该类封装了描述Task所需的各种信息以及Task执行的各种动作。
  TaskTracker自从启动以后,会每隔一段时间向JobTracker发送消息,消息的名称为“Heartbeat”。Heartbeat中包含了该TaskTracker当前的状态以及对Task的请求。JobTracker在收到Heartbeat之后,会检查该heartbeat的里所包含的各种信息,如果发现错误会启动相应的错误处理程序。如果TaskTracker在Heartbeat中添加了对Task的请求,则 JobTracker会添加相应的指令在对Heartbeat的回复中。在Hadoop源代码中,JobTracker对TaskTracker的指令称为action,JobTracker对TaskTracker所发送来的Heartbeat的回复消息称为HeartbeatResponse。
  在TaskTracker内部,有一个队列叫做TaskQueue。该中包含了所有新加入的Task(Task包括Map Task,和Reduce Task)。每当TaskTracker收到 HeartbeatResponse后,会对其进行检查,如果其中包含了新的Task,便将其加入到TaskQueue中。在TaskTracker内部,有两个线程不断轮询TaskQueue,一个是MapLauncher,另一个是ReduceLauncher。如果发现有新加入的Map任务,MapLauncher便将其取出并且执行。如果是Reduce任务,ReduceLauncher便将其取出执行。
  不论是Map Task还是Reduce Task,当他们被取出之后,都要进行本地化。本地化的意思就是将所有需要的信息,比如需要运行的jar文件、配置文件、输入数据等等,一起拷贝到本地的文件系统。这样做的目的是为了方便任务在某台机器上独立执行。本地化之后,TaskTracker会为每一个task单独创建一个jvm,然后单独运行。等Task运行完之后,TaskTracker会通知JobTracker任务完成,以进行下一步的动作。
  等到所有的Task都完成之后,Job也就完成了,此时JobTracker会通知JobClient工作完成。

3 代码详细分析
  下面从用户使用Hadoop进行MapReduce计算的过程为线索,详细介绍Task执行的细节,并对Hadoop MapReduce的主要代码进行分析。

3.1 启动Hadoop集群
  Hadoop集群的启动是通过在Master上运行start-all.sh脚本进行的。运行该脚本之后,Hadoop会配置一系列的环境变量以及其他Hadoop运行所需要的参数,然后在本机(Master)运行JobTracker和NameNode。然后通过SSH登录到所有slave机器上,启动 TaskTracker和DataNode。
  因为本文只介绍Hadoop MapReduce模块,所以NameNode和DataNode的相关知识不再介绍。

3.2 JobTracker启动以及Job的初始化
  org.apache.hadoop.mapred.JobTracker类实现了Hadoop MapReduce模型的JobTracker的功能,主要负责任务的接受,初始化,调度以及对TaskTracker的监控。
  JobTracker单独作为一个JVM运行,main函数就是启动JobTracker的入口函数。在main函数中,有以下两行非常重要的代码:
  startTracker(new JobConf());
  JobTracker.offerService();
  startTracker函数是一个静态函数,它调用JobTracker的构造函数生成一个JobTracker类的实例,名为result。然后,进行了一系列初始化活动,包括启动RPC server,启动内置的jetty服务器,检查是否需要重启JobTracker等。
  在JobTracker.offerService()中,调用了taskScheduler对象的start()方法。该对象是 JobTracker的一个数据成员,类型为TaskScheduler。该类型提供了一系列接口,使得JobTracker可以对所有提交的job进行初始化以及调度。但是该类型实际上是一个抽象类型,其真正的实现类型为JobQueueTaskScheduler类,所以,taskScheduler.start()方法执行的是JobQueueTaskScheduler类的start方法。
  该方法的详细代码如下:



public synchronized void start() throwsIOException {
//调用TaskScheduler.start()方法,实际上没有做任何事情
super.start();
//注册一个JobInProgressListerner监听器

taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
eagerTaskInitializationListener.start();
taskTrackerManager.addJobInProgressListener(eagerTaskInitializationListener)

}

  JobQueueTaskScheduler类的start方法主要注册了两个非常重要的监听 器:jobQueueJobInProgressListener和eagerTaskInitializationListener。前者是 JobQueueJobInProgressListener类的一个实例,该类以先进先出(内部实现的就是串行)的方式维持一个JobInProgress的队列,并且监听各 个JobInProgress实例在生命周期中的变化;后者是EagerTaskInitializationListener类的一个实例,该类不断监听jobInitQueue,一旦发现有新的job被提交(即有新的JobInProgress实例被加入),则立即调用该实例(JobInProgress实例)的initTasks方法,对job进行初始化。
  JobInProgress类的initTasks方法的主要代码如下:



/**这是一个异步调用的线程,使得分片计算不阻塞任何线程*/
public synchronized void initTasks() throwsIOException {
……
//读取输入分片,从HDFS中读取job.split文件,并为每个Map创建一个分片

String jobFile = profile.getJobFile();
Path sysDir = newPath(this.jobtracker.getSystemDir());
FileSystem fs = sysDir.getFileSystem(conf);
DataInputStream splitFile =
fs.open(newPath(conf.get("mapred.job.split.file")));//默认为job.split

JobClient.RawSplit[] splits;
try {
splits = JobClient.readSplitFile(splitFile);//读取输入分片文件job.split

} finally {
splitFile.close();
}



………………



//map task的个数就是input splits的个数

numMapTasks = splits.length;
//为每个map tasks生成一个TaskInProgress来处理一个input split

maps = newTaskInProgress[numMapTasks];
for(inti=0; i < numMapTasks; ++i) {
inputLength += splits.getDataLength();
maps =new TaskInProgress(jobId, jobFile, splits,jobtracker, conf, this, i);
}
/*
对于map task,将其放入nonRunningMapCache,是一个Map,也即对于map task来讲,其将会被分配到其input
split所在的Node上。在此,Node代表一个datanode或者机架或者数据中
心。nonRunningMapCache将在JobTracker向TaskTracker分配map task的
时候使用。
*/
if(numMapTasks > 0) {
nonRunningMapCache = createCache(splits,maxLevel);
}
//创建reduce task
this.reduces = new TaskInProgress[numReduceTasks];
for (int i= 0; i < numReduceTasks; i++) {
reduces= new TaskInProgress(jobId, jobFile, numMapTasks, i,jobtracker, conf, this);
/*reducetask放入nonRunningReduces,其将在JobTracker向TaskTracker
分配reduce task的时候使用。*/
nonRunningReduces.add(reduces);
}

//创建两个cleanup task,一个用来清理map,一个用来清理reduce.

cleanup =new TaskInProgress[2];
// 清理map提示. 此map不使用任何分片. 仅仅分配空的分片.

    JobClient.RawSplit emptySplit = new JobClient.RawSplit();




    cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks);
     cleanup[0].setJobCleanupTask();
// 清理reduce提示
cleanup[1]= new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks, jobtracker, conf, this);
cleanup[1].setJobCleanupTask();
//创建两个初始化 task,一个初始化map,一个初始化reduce.

setup =new TaskInProgress[2];
setup[0] =new TaskInProgress(jobId, jobFile, splits[0],jobtracker,conf, this, numMapTasks + 1 );
setup[0].setJobSetupTask();
setup[1] =new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this);
setup[1].setJobSetupTask();
tasksInited.set(true);//初始化完毕

……
}
3.3 TaskTracker启动以及发送Heartbeat
  org.apache.hadoop.mapred.TaskTracker类实现了MapReduce模型中TaskTracker的功能。
  TaskTracker也是作为一个单独的JVM来运行的,其main函数就是TaskTracker的入口函数,当运行start-all.sh时,脚本就是通过SSH运行该函数来启动TaskTracker的。
  Main函数中最重要的语句是:
  new TaskTracker(conf).run();
  其中run函数主要调用了offerService函数:



State offerService() throws Exception {
longlastHeartbeat = 0;
//TaskTracker进程是一直存在的
while(running && !shuttingDown) {
……
long now = System.currentTimeMillis();
//每隔一段时间就向JobTracker发送heartbeat

long waitTime = heartbeatInterval - (now - lastHeartbeat);//还需等待的时间
if(waitTime > 0) {
synchronized(finishedCount) {
if (finishedCount[0] == 0) {
finishedCount.wait(waitTime);
}
finishedCount[0] = 0;
}
}
……
//发送Heartbeat到JobTracker,得到response

HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
……
//从Response中得到此TaskTracker需要做的事情

TaskTrackerAction[] actions = heartbeatResponse.getActions();
……
if(actions != null){
for(TaskTrackerAction action: actions) {
if (action instanceof LaunchTaskAction) {
//如果action是一个新的Task(是LaunchTaskAction的实例),则将Action添加到任务队列中

addToTaskQueue((LaunchTaskAction)action);
}else if (action instanceof CommitTaskAction) {
//如果action是提交过的一个Task(是CommitTaskAction的实例),在响应的队列中若无当前task,则将其添加进去

           CommitTaskAction commitAction = (CommitTaskAction)action;
if (!commitResponses.contains(commitAction.getTaskID())) {
commitResponses.add(commitAction.getTaskID());
}
}else {
tasksToCleanup.put(action);
}
}
}
}
returnState.NORMAL;
}
  其中transmitHeartBeat函数的作用就是第2章中提到的向JobTracker发送Heartbeat。其主要逻辑如下:



private HeartbeatResponse transmitHeartBeat(long  now) throws IOException {
//每隔一段时间,在heartbeat中要返回给JobTracker一些统计信息

boolean sendCounters;
if (now> (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
sendCounters = true;
previousUpdate = now;
}
else {
sendCounters = false;
}
……
//报告给JobTracker,此TaskTracker的当前状态
if(status == null) {
synchronized (this) {
status = new TaskTrackerStatus(taskTrackerName, localHostname,httpPort,cloneAndResetRunningTaskStatuses(sendCounters),
failures,maxCurrentMapTasks, maxCurrentReduceTasks);
}
}
……
/**当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:
*当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数
*或者当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数,就是TaskTracker有空闲的意思,Map或Reduce有一个满足条件就OK*/

boolean askForNewTask;
long localMinSpaceStart;
synchronized (this) {
askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||
status.countReduceTasks()  0) {
mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
}
double reduceLoadFactor = 0.0;
if (clusterReduceCapacity > 0) {
reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
}
//
// 在下面的步骤中,我们首先分配map tasks(如果满足条件),接着分配reduce tasks (如果满足条件)。
// 我们仔细检查每一个到来的job,仅当jobs的准备工作就绪后,它才能获得服务。//
//
// 当给给定主机的工作量小于此种类型Task的工作量的时候,我们给当前的TaskTracker分配task。//然而,如果集群接近满负荷,我们就不会有足够的空间投机执行task,我们仅仅调度最高优先级的task获得执行。//
   
final int trackerCurrentMapCapacity =
Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),
trackerMapCapacity);
int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;//还可以执行的Map个数
boolean exceededMapPadding = false;
if (availableMapSlots > 0) {
exceededMapPadding =
exceededPadding(true, clusterStatus, trackerMapCapacity);
}
int numLocalMaps = 0;
int numNonLocalMaps = 0;
scheduleMaps:
for (int i=0; i < availableMapSlots; ++i) {
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
Task t = null;
// 尝试调度本地节点或本地机架的Map task
t =
job.obtainNewLocalMapTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
assignedTasks.add(t);
++numLocalMaps;
// 不把map task的任务分配到极致,
// 集群要为将来失败的task、投机的task保留一些自由区域,
//  超越最高优先级的job
if (exceededMapPadding) {
break scheduleMaps;
}
// 为下个Map task再次尝试所有的jobs
break;
}
// 尝试调度本地节点或本地机架的Map Task
t =
job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
assignedTasks.add(t);
++numNonLocalMaps;
// 我们至多分配一个关交换或者投机task
// 这主要是用来阻止taskTrackers从其他taskTrackers窃取本地tasks
//
break scheduleMaps;
}
}
}
}
int assignedMaps = assignedTasks.size();
//
// 为reduce tasks保存一些东西
    // 然而,对于每个心跳,我们不会分配超过一个reduce task(通常是一个或者0个)
//
    final int trackerCurrentReduceCapacity =
Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity),
trackerReduceCapacity);
final int availableReduceSlots =
Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
boolean exceededReducePadding = false;
if (availableReduceSlots > 0) {
exceededReducePadding = exceededPadding(false, clusterStatus,
trackerReduceCapacity);
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING ||
job.numReduceTasks == 0) {
continue;
}
Task t =
job.obtainNewReduceTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts()
);
if (t != null) {
assignedTasks.add(t);
break;
}
             // 不把reduce task的任务分配到极致,
            // 集群要为将来失败的task、投机的task保留一些自由区域,
            //  超越最高优先级的job

if (exceededReducePadding) {
break;
}
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Task assignments for " + taskTracker.getTrackerName() + " --> " +
"[" + mapLoadFactor + ", " + trackerMapCapacity + ", " +
trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" +
(trackerCurrentMapCapacity - trackerRunningMaps) + ", " +
assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps +
")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " +
trackerCurrentReduceCapacity + "," + trackerRunningReduces +
"] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) +
", " + (assignedTasks.size()-assignedMaps) + "]");
}
return assignedTasks;
}
  
  从上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配maptask的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找 TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。

3.5 TaskTracker接收HeartbeatResponse
  在向JobTracker发送heartbeat后,如果返回的reponse中含有分配好的任务 LaunchTaskAction,TaskTracker则调用addToTaskQueue方法,将其加入TaskTracker类中 MapLauncher或者ReduceLauncher对象的taskToLaunch队列。在此,MapLauncher和 ReduceLauncher对象均为TaskLauncher类的实例。该类是TaskTracker类的一个内部类,具有一个数据成员,是 TaskTracker.TaskInProgress类型的队列。在此特别注意,在TaskTracker类内部所提到的TaskInProgress 类均为TaskTracker的内部类,我们用TaskTracker.TaskInProgress表示,一定要和MapRed包中的 TaskInProgress类区分,后者我们直接用TaskInProgress表示。如果应答包中包含的任务是map task则放入mapLancher的taskToLaunch队列,如果是reduce task则放入reduceLancher的taskToLaunch队列:
  TaskLauncher类的addToTaskQueue方法代码如下:



private void addToTaskQueue(LaunchTaskAction action) {
if(action.getTask().isMapTask()) {
mapLauncher.addToTaskQueue(action);
} else {
reduceLauncher.addToTaskQueue(action);
}
}
  



private TaskInProgress registerTask(LaunchTaskAction action,
TaskLauncher launcher) {
//从action中获取Task对象

Task t = action.getTask();  
LOG.info("LaunchTaskAction(registerTask): " + t.getTaskID() +
" task's state:" + t.getState());
//生成TaskTracker.TaskInProgress对象

TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
synchronized(this){
/*在相应的数据结构中增加所生成的TaskTracker.TaskInProgress对
象,以通知程序其他部分该任务的建立*/
tasks.put(t.getTaskID(),tip);
runningTasks.put(t.getTaskID(),tip);
boolean isMap =t.isMapTask();
if (isMap) {
mapTotal++;
} else {
reduceTotal++;
}
}
return tip;
}
  同时,TaskLauncher类继承了Thread类,所以在程序运行过程中,它们各自都以一个线程独立运行。它们的启动在 TaskTracker初始化过程中已经完成。该类的run函数就是不断监测taskToLaunch队列中是否有新的 TaskTracker.TaskInProgress对象加入。如果有则从中取出一个对象,然后调用TaskTracker类的 startNewTask(TaskInProgress tip)来启动一个task,其又主要调用了localizeJob(TaskInProgresstip),该函数的工作就是第二节中提到的本地化。该函数代码如下:



private void localizeJob(TaskInProgress tip)throws IOException {
//首先要做的一件事情是有关Task的文件从HDFS拷贝的TaskTracker的本地文件系统中:job.split,job.xml以及job.jar

Path localJarFile = null;
Task t =tip.getTask();
JobID jobId = t.getJobID();
Path jobFile = new Path(t.getJobFile());
……
Path localJobFile = lDirAlloc.getLocalPathForWrite(
getLocalJobDir(jobId.toString())
+ Path.SEPARATOR + "job.xml",
jobFileSize, fConf);
RunningJob rjob = addTaskToJob(jobId, tip);
synchronized (rjob) {
if(!rjob.localized) {
FileSystem localFs = FileSystem.getLocal(fConf);
Path jobDir = localJobFile.getParent();
……
//将job.split拷贝到本地

systemFS.copyToLocalFile(jobFile, localJobFile);
JobConf localJobConf = new JobConf(localJobFile);
Path workDir = lDirAlloc.getLocalPathForWrite(
(getLocalJobDir(jobId.toString())
+ Path.SEPARATOR +"work"), fConf);
if(!localFs.mkdirs(workDir)) {
throw new IOException("Mkdirs failed to create "
+ workDir.toString());
}
System.setProperty("job.local.dir", workDir.toString());
localJobConf.set("job.local.dir", workDir.toString());
//copy Jar file to the local FS and unjar it.
     //这里的解压和我们之前解压MapReduce打包成的jar文件有相似之处
     String jarFile = localJobConf.getJar();
long jarFileSize = -1;
if(jarFile != null) {
Path jarFilePath = new Path(jarFile);
localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
getLocalJobDir(jobId.toString())
+Path.SEPARATOR + "jars",
*jarFileSize, fConf), "job.jar");
if(!localFs.mkdirs(localJarFile.getParent())) {
throw new IOException("Mkdirs failed to create jars directory");
}
//将job.jar拷贝到本地

systemFS.copyToLocalFile(jarFilePath, localJarFile);
localJobConf.setJar(localJarFile.toString());
//将job的configuration写成job.xml,可以为后面得具体实现作为一个参考
       OutputStream out = localFs.create(localJobFile);
try{
localJobConf.writeXml(out);
}finally {
out.close();
}
// 解压缩job.jar

RunJar.unJar(new File(localJarFile.toString()),
new File(localJarFile.getParent().toString()));
}
rjob.localized = true;
rjob.jobConf = localJobConf;
}
}
//真正的启动此Task

launchTaskForJob(tip, new JobConf(rjob.jobConf));
}
  当所有的task运行所需要的资源都拷贝到本地后,则调用TaskTracker的launchTaskForJob方法,其又调用TaskTracker.TaskInProgress的launchTask函数:



public synchronized void launchTask() throwsIOException {
……
//创建task运行目录

localizeTask(task);
if(this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
this.taskStatus.setRunState(TaskStatus.State.RUNNING);
}
//创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建的是ReduceTaskRunner
this.runner = task.createRunner(TaskTracker.this, this);
this.runner.start();
this.taskStatus.setStartTime(System.currentTimeMillis());
}
  TaskRunner是抽象类,是Thread类的子类,其run函数如下:



public final void run() {
……
TaskAttemptID taskid = t.getTaskID();
LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
File jobCacheDir = null;
if(conf.getJar() != null) {
jobCacheDir = new File(
new Path(conf.getJar()).getParent().toString());
}
File workDir = new File(lDirAlloc.getLocalPathToRead(
TaskTracker.getLocalTaskDir(
t.getJobID().toString(),
t.getTaskID().toString(),
t.isTaskCleanupTask())
+ Path.SEPARATOR + MRConstants.WORKDIR,
conf).toString());
FileSystem fileSystem;
Path localPath;
……
//拼接所有的classpath

String baseDir;
String sep = System.getProperty("path.separator");
//不同的系统下面,路径分隔符不一样,windows下面为“;”
   StringBuffer classPath = new StringBuffer();
//start with same classpath as parent process

classPath.append(System.getProperty("java.class.path"));
classPath.append(sep);
if(!workDir.mkdirs()) {
if(!workDir.isDirectory()) {
LOG.fatal("Mkdirs failed to create " + workDir.toString());
}
}
String jar = conf.getJar();
// 其实这部分上面也判断过
    if (jar!= null) {     
// if jar exists, it into workDir

File[] libs = new File(jobCacheDir, "lib").listFiles();
if(libs != null) {
for(int i = 0; i < libs.length; i++) {
classPath.append(sep);         //add  jar from libs to classpath

classPath.append(libs);
}
}
classPath.append(sep);
classPath.append(new File(jobCacheDir, "classes"));
classPath.append(sep);
classPath.append(jobCacheDir);
}
……
classPath.append(sep);
classPath.append(workDir);
//拼写命令行java及其参数

Vector vargs = new Vector(8);
File jvm =
new File(new File(System.getProperty("java.home"), "bin"),"java");
vargs.add(jvm.toString());
String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
javaOpts = javaOpts.replace("@taskid@", taskid.toString());
String[] javaOptsSplit = javaOpts.split(" ");
String libraryPath = System.getProperty("java.library.path");
if(libraryPath == null) {
libraryPath = workDir.getAbsolutePath();
} else{
libraryPath += sep + workDir;
}
boolean hasUserLDPath = false;
for(inti=0; i 0 && ++numTasksExecuted ==
  numTasksToExecute){
  break;
  }
  }

3.6.1 MapTask的运行

3.6.1.1 MapTask.run()方法
  如果task是MapTask,则其run函数如下:
  public void run(final JobConf job, finalTaskUmbilicalProtocol umbilical)
  throws IOException,ClassNotFoundException, InterruptedException {
  //负责与TaskTracker的通信,通过该对象可以获得必要的对象
  this.umbilical = umbilical;  
  // 启动Reporter线程,用来和TaskTracker交互目前运行的状态
  TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
  reporter.startCommunicationThread();
  boolean useNewApi =job.getUseNewMapper();
  /*用来初始化任务,主要是进行一些和任务输出相关的设置,比如创
  建commiter,设置工作目录等*/
  initialize(job, getJobID(),reporter, useNewApi);
  /*以下4个if语句均是根据任务类型的不同进行相应的操作,这些方
  法均是Task类的方法,所以与任务是MapTask还是ReduceTask无关*/
  if(jobCleanup) {
  runJobCleanupTask(umbilical,reporter);
  return;
  }
  if(jobSetup) {
  //主要是创建工作目录的FileSystem对象
  runJobSetupTask(umbilical,reporter);
  return;
  }
  if(taskCleanup) {
  //设置任务目前所处的阶段为结束阶段,并且删除工作目录
  runTaskCleanupTask(umbilical,reporter);
  return;
  }
  //如果不是上述四种类型,则真正运行任务
  if (useNewApi) {
  runNewMapper(job, split, umbilical,reporter);
  } else {
  runOldMapper(job, split, umbilical, reporter);
  }
  done(umbilical, reporter);
  }

3.6.1.2 MapTask.runNewMapper()方法
  其中,我们只研究运用新API编写程序的情况,所以runOldMapper函数我们将不做考虑。runNewMapper的代码如下:
  private   
  voidrunNewMapper(
  final JobConf job,
  final BytesWritable rawSplit,
  final TaskUmbilicalProtocol umbilical,
  TaskReporter reporter
  ) throws IOException, ClassNotFoundException, InterruptedException{
  /*TaskAttemptContext类继承于JobContext类,相对于JobContext类增加
  了一些有关task的信息。通过taskContext对象可以获得很多与任务执行相
  关的类,比如用户定义的Mapper类,InputFormat类等等 */
  org.apache.hadoop.mapreduce.TaskAttemptContexttaskContext =
  new org.apache.hadoop.mapreduce.TaskAttemptContext(job,getTaskID());
  //创建用户自定义的Mapper类的实例
  org.apache.hadoop.mapreduce.Mapper
    mapper=
  org.apache.hadoop.mapreduce.Mapper) ReflectionUtils.newInstance(taskContext.getMapperClass(),job);
  // 创建用户指定的InputFormat类的实例
  org.apache.hadoop.mapreduce.InputFormat inputFormat= (org.apache.hadoop.mapreduce.InputFormat)
  ReflectionUtils.newInstance(taskContext.getInputFormatClass(),job);
  // 重新生成InputSplit
  org.apache.hadoop.mapreduce.InputSplit split =null;
  DataInputBuffer splitBuffer =new DataInputBuffer();
  splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength());
  SerializationFactory factory =new SerializationFactory(job);
  Deserializer

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-86478-1-1.html 上篇帖子: Hadoop NameNode HA(验证成功) 下篇帖子: 在Hadoop分布式文件系统的索引和搜索
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表