设为首页 收藏本站
查看: 3862|回复: 0

[经验分享] hadoop运行流程分析源代码级

[复制链接]

尚未签到

发表于 2015-7-14 07:58:29 | 显示全部楼层 |阅读模式
  
  前言:
  最近一直在分析hadoop的运行流程,我们查阅了大量的资料,虽然从感性上对这个流程有了一个认识但是我总是感觉对mapreduce的运行还是没有一个全面的认识,所以决定从源代码级别对mapreduce的运行流程做一个分析。
  前奏:
  首先从任务提交开始,如果我们使用的是job类的话那么提交任务的触发语句是
  job.waitForCompletion(true),true表示运行时打印运行的信息;
  在 eclipse中我们按F3键可以发现这个方法的代码,这个方法实际是调用了job类的submit方法,而submit方法又调用 submitJobInternal(conf)的方法提交任务,然后这个方法会将job的job.jar,job.split,job.xml三个文件 上传倒hdfs文件系统,



  1 ugi.doAs(new PrivilegedExceptionAction() {
  2
  3       public RunningJob run() throws FileNotFoundException,
  4
  5       ClassNotFoundException,
  6
  7       InterruptedException,
  8
  9       IOException{
10
11         JobConf jobCopy = job;
12
13         Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
14
15             jobCopy);
16
17         //得到job的ID;
18
19         JobID jobId = jobSubmitClient.getNewJobId();
20
21       //上传Job的路径
22
23         Path submitJobDir = new Path(jobStagingArea, jobId.toString());
24
25         jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
26
27         JobStatus status = null;
28
29         try {
30
31           populateTokenCache(jobCopy, jobCopy.getCredentials());
32
33  
34
35           copyAndConfigureFiles(jobCopy, submitJobDir);
36
37        
38
39         //获得namenode的任务代理
40
41           // get delegation token for the dir
42
43           TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
44
45                                               new Path [] {submitJobDir},
46
47                                               jobCopy);
48
49        //得到job配置路径;
50
51           Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
52
53        //设置job的reduce的任务数目默认为1
54
55           int reduces = jobCopy.getNumReduceTasks();
56
57        //得到本地的IP
58
59           InetAddress ip = InetAddress.getLocalHost();
60
61           if (ip != null) {
62
63             job.setJobSubmitHostAddress(ip.getHostAddress());
64
65             job.setJobSubmitHostName(ip.getHostName());
66
67           }
68
69           JobContext context = new JobContext(jobCopy, jobId);
70
71  
72
73           jobCopy = (JobConf)context.getConfiguration();
74
75  
76
77           // Check the output specification
78
79           if (reduces == 0 ? jobCopy.getUseNewMapper() :
80
81             jobCopy.getUseNewReducer()) {
82
83             org.apache.hadoop.mapreduce.OutputFormat output =
84
85               ReflectionUtils.newInstance(context.getOutputFormatClass(),
86
87                   jobCopy);
88
89             output.checkOutputSpecs(context);
90
91           } else {
92
93             jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
94
95           }
96
97  
98
99          //创建文件的split
100
101           FileSystem fs = submitJobDir.getFileSystem(jobCopy);
102
103           LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
104
105           int maps = writeSplits(context, submitJobDir);
106
107           jobCopy.setNumMapTasks(maps);
108
109  
110
111           // write "queue admins of the queue to which job is being submitted"
112
113           // to job file.
114
115           String queue = jobCopy.getQueueName();
116
117           AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
118
119           jobCopy.set(QueueManager.toFullPropertyName(queue,
120
121               QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());
122
123  
124
125           // Write job file to JobTracker's fs      
126
127           FSDataOutputStream out =
128
129             FileSystem.create(fs, submitJobFile,
130
131                 new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
132
133  
134
135           try {
136
137              //写入xml文件
138
139             jobCopy.writeXml(out);
140
141           } finally {
142
143             out.close();
144
145           }
146
147           //
148
149           // Now, actually submit the job (using the submit name)
150
151           //
152
153           printTokens(jobId, jobCopy.getCredentials());
154
155        //使用代理机制提交作业
156
157           status = jobSubmitClient.submitJob(
158
159               jobId, submitJobDir.toString(), jobCopy.getCredentials());
160
161           if (status != null) {
162
163             return new NetworkedJob(status);
164
165           } else {
166
167             throw new IOException("Could not launch job");
168
169           }
170
171         } finally {
172
173           if (status == null) {
174
175             LOG.info("Cleaning up the staging area " + submitJobDir);
176
177             if (fs != null && submitJobDir != null)
178
179               fs.delete(submitJobDir, true);
180
181           }
182
183         }
184
185       }
186
187     });
  
  查看这个方法我们发现在这里我们可以得到一个job的路径和job的id。然后使用代理机制提交作业。
  任务提交完之后jobtracker会产生一个JobInProgress类的实例,这个类表示了job的各种信息,以及job所需要执行的各种动作。JobTracker收倒提交的数据之后就会根据job的配置tasktracker分配任务,当所有的tasktracker执行完之后才会通知jobclient任务完成。Jobtracker会将job加到一个队列中去这个队列叫jobInitQuene,然后在JobTracker有一个名为JobQueueTaskSchedule的对象,会轮询队列的每一个对象,一旦有新的job加入就将其取出,然后将其初始化。对于每个task还会有一个TaskInprogress对象与其对应。TaskTracker启动之后和JobTracker的通信机制是通过心跳机制。
  TaskTracker 每个三秒钟向JobTracker发送一个JobTracker发送一个HearBeat,HeartBeat中会有很多Taskracker的信息。 JobTracker在收到Heartbeat之后,会检查该heartbeat的里所包含的各种信息,如果发现错误会启动相应的错误处理程序。如果 TaskTracker在Heartbeat中添加了对Task的请求,则JobTracker会添加相应的指令在对Heartbeat的回复中。在 Hadoop源代码中,JobTracker对TaskTracker的指令称为action,JobTracker对TaskTracker所发送来的 Heartbeat的回复消息称为HeartbeatResponse。
  在 TaskTracker内部,有一个队列叫做TaskQueue。该中包含了所有新加入的Task。每当TaskTracker收到 HeartbeatResponse后,会对其进行检查,如果其中包含了新的Task,便将其加入到TaskQueue中。在TaskTracker内 部,有两个线程不断轮询TaskQueue,一个是MapLauncher,另一个是ReduceLauncher。如果发现有新加入的Map任 务,MapLauncher便将其取出并且执行。如果是Reduce任务,ReduceLauncher便将其取出执行。
  不 论是Map Task还是Reduce Task,当他们被取出之后,都要进行本地化。本地化的意思就是将所有需要的信息,比如需要运行的jar文件、配置文件、输入数据等等,一起拷贝到本地的 文件系统。这样做的目的是为了方便任务在某台机器上独立执行。本地化之后,TaskTracker会为每一个task单独创建一个jvm,然后单独运行。 等Task运行完之后,TaskTracker会通知JobTracker任务完成,以进行下一步的动作。
  等到所有的Task都完成之后,Job也就完成了,此时JobTracker会通知JobClient工作完成。
  代码详解:
  当 我们在hadoop中bin/start-all.sh之后我们查看脚本会发现,它启动了三个脚本hadoop-config.sh,start- dfs.sh ,start-mapred.sh。Hadoop会根据一系列的配置启动JobTracker和TaskTracker。Master会根据SSH登录登 录到slaves机器上启动tasktracker和datanode。
  下面结合hadoop的源代码进行流程分析
  先介绍JobTracker和TaskTracker
  在 每一个JobTracker对应一个org.apache.hadoop.mapred.JobTracker类,这个类主要负责任务的接受,调度以及对 TaskTracker的监控,每个JobTracker类是作为一个单独的JVM来使用的。通过方法startTracker()方法启动一个 JobTracker。源代码:
  
  



/**
* Start the JobTracker with given configuration.
*
* The conf will be modified to reflect the actual ports on which
* the JobTracker is up and running if the user passes the port as
* zero.
*  
* @param conf configuration for the JobTracker.
* @throws IOException
*/
public static JobTracker startTracker(JobConf conf, String identifier)
throws IOException, InterruptedException {

DefaultMetricsSystem.initialize("JobTracker");
JobTracker result = null;
while (true) {
try {
//初始化 JobTracker名为result

result = new JobTracker(conf, identifier);
result.taskScheduler.setTaskTrackerManager(result);
break;
} catch (VersionMismatch e) {
throw e;
} catch (BindException e) {
throw e;
} catch (UnknownHostException e) {
throw e;
} catch (AccessControlException ace) {
// in case of jobtracker not having right access
// bail out
throw ace;
} catch (IOException e) {
LOG.warn("Error starting tracker: " +
StringUtils.stringifyException(e));
}
Thread.sleep(1000);
}
if (result != null) {
JobEndNotifier.startNotifier();
MBeans.register("JobTracker", "JobTrackerInfo", result);
}
return result;
}
  
  startTracker根据conf配置创建JobTracker对象,然后进行了一系列初始化活动,包括启动RPC server,启动内置的jetty服务器,检查是否需要重启JobTracker等。
  还有一个重要的方法就是offerService方法
  



/**
* Run forever
*/
public void offerService() throws InterruptedException, IOException {
// Prepare for recovery. This is done irrespective of the status of restart
// flag.
while (true) {
try {
recoveryManager.updateRestartCount();
break;
} catch (IOException ioe) {
LOG.warn("Failed to initialize recovery manager. ", ioe);
// wait for some time

Thread.sleep(FS_ACCESS_RETRY_PERIOD);
LOG.warn("Retrying...");
}
}

taskScheduler.start();

//  Start the recovery after starting the scheduler
try {
//恢复

recoveryManager.recover();
} catch (Throwable t) {
LOG.warn("Recovery manager crashed! Ignoring.", t);
}
// refresh the node list as the recovery manager might have added
// disallowed trackers


refreshHosts();

this.expireTrackersThread = new Thread(this.expireTrackers,
"expireTrackers");
this.expireTrackersThread.start();
this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
this.retireJobsThread.start();
expireLaunchingTaskThread.start();

if (completedJobStatusStore.isActive()) {
completedJobsStoreThread = new Thread(completedJobStatusStore,
"completedjobsStore-housekeeper");
completedJobsStoreThread.start();
}

// start the inter-tracker server once the jt is ready
this.interTrackerServer.start();

synchronized (this) {
state = State.RUNNING;
}
LOG.info("Starting RUNNING");

this.interTrackerServer.join();
LOG.info("Stopped interTrackerServer");
}
  
  这个方法会一直执行,这里虽然是个死循环但是它能不断的恢复,启动任务,通过这种方式首先调度。
  该 方法调用了taskSchedule对象的start()方法。该对象是JobTracker的数据成员,类型提供了一些列接口,使得 JobTracker可以对所有提交的job进行初始化以及调度。但是该类型实际上是一个抽象类型,其真正的实现类为 JobQueueTaskSchedule类。所以,taskSchedule.start()方法实际执行的是JobQueueSchedule的 start方法;
  



/**
* Lifecycle method to allow the scheduler to start any work in separate
* threads.
* @throws IOException
*/
public void start() throws IOException {
// do nothing

}

//请看JobQueueSchedule的start方法;
public synchronized void start() throws IOException {
super.start();//什么都不做
//添加JobInProgressListener监听器

taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
//添加任务初始化监听器

eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
eagerTaskInitializationListener.start();//启动

taskTrackerManager.addJobInProgressListener(
eagerTaskInitializationListener);
}
  
  JobQueueTaskScheduler 类的start方法主要注册了两个非常重要的监听器jobQueueJobInProgressListener和 eagerTaskInitializationListener。前者是 JobQueueJobInProgressListener类的一个实例,该类以先进先出的方式维持一个JobInProgress的队列,并且监听各 个JobInProgress实例在生命周期中的变化;后者是EagerTaskInitializationListener类的一个实例,该类不断监 听jobInitQueue,一旦发现有新的job被提交(即有新的JobInProgress实例被加入),则立即调用该实例的initTasks方 法,对job进行初始化。
  在看一下JobInProgress类的initTasks方法
  



/**
* Construct the splits, etc.  This is invoked from an async
* thread so that split-computation doesn't block anyone.
*/
public synchronized void initTasks()
throws IOException, KillInterruptedException {
if (tasksInited || isComplete()) {
return;
}
synchronized(jobInitKillStatus){
if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
return;//已经启动或者任务被kill直接结束方法;

}
jobInitKillStatus.initStarted = true;
}

LOG.info("Initializing " + jobId);
final long startTimeFinal = this.startTime;
// log job info as the user running the job
try {
在这里执行job
userUGI.doAs(new PrivilegedExceptionAction() {
@Override
public Object run() throws Exception {
JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile,
startTimeFinal, hasRestarted());
return null;
}
});
} catch(InterruptedException ie) {
throw new IOException(ie);
}

// log the job priority

setPriority(this.priority);

//
// generate security keys needed by Tasks
//

generateAndStoreTokens();

//
// read input splits and create a map per a split
//
//读取每个spit

TaskSplitMetaInfo[] splits = createSplits(jobId);
if (numMapTasks != splits.length) {
throw new IOException("Number of maps in JobConf doesn't match number of " +
"recieved splits for job " + jobId + "! " +
"numMapTasks=" + numMapTasks + ", #splits=" + splits.length);
}
numMapTasks = splits.length;

//map和reduce任务等待直到得到slots才开始执行;

jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
jobtracker.getInstrumentation().addWaitingReduces(getJobID(),numReduceTasks;

  
  下面通过初始化map任务
  



maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits.getInputDataLength();
//初始化map任务;

maps = new TaskInProgress(jobId, jobFile,
splits,
jobtracker, conf, this, i, numSlotsPerMap);
}
LOG.info("Input size for job " + jobId + " = " + inputLength
+ ". Number of splits = " + splits.length);

// Set localityWaitFactor before creating cache

localityWaitFactor =
conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR);
if (numMapTasks > 0) {
//创建

nonRunningMapCache = createCache(splits, maxLevel);
}

// set the launch time
this.launchTime = jobtracker.getClock().getTime();
  
  
  在这里创建reduce任务;
  



  //
// Create reduce tasks
//

this.reduces = new TaskInProgress[numReduceTasks];
for (int i = 0; i < numReduceTasks; i++) {
reduces = new TaskInProgress(jobId, jobFile,
numMapTasks, i,
jobtracker, conf, this, numSlotsPerReduce);
nonRunningReduces.add(reduces);
}

  
  
  在这里计算启动reduce任务时最小的map任务数目
  



completedMapsForReduceSlowstart =
(int)Math.ceil(
(conf.getFloat("mapred.reduce.slowstart.completed.maps",
DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *
numMapTasks));

  
  估计所有的map输出的数目
  



resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);

// create cleanup two cleanup tips, one map and one reduce.

cleanup = new TaskInProgress[2];

// cleanup map tip. This map doesn't use any splits. Just assign an empty
  
  Split的信息.
  



  TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks, 1);
cleanup[0].setJobCleanupTask();

// cleanup reduce tip.

cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this, 1);
cleanup[1].setJobCleanupTask();

// create two setup tips, one map and one reduce.

setup = new TaskInProgress[2];

// setup map tip. This map doesn't use any split. Just assign an empty
// split.

setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks + 1, 1);
setup[0].setJobSetupTask();

// setup reduce tip.

setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this, 1);
setup[1].setJobSetupTask();

synchronized(jobInitKillStatus){
jobInitKillStatus.initDone = true;
if(jobInitKillStatus.killed) {
throw new KillInterruptedException("Job " + jobId + " killed in init");
}
}

tasksInited = true;
JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
numMapTasks, numReduceTasks);

// Log the number of map and reduce tasks

LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
+ " map tasks and " + numReduceTasks + " reduce tasks.");
}
代码中我们发现maptask和reducetTask都是TaskInProgress的实例,并且Task的数目与map的数目保持一致。

  
  
  接下来要创建datanode的tasktracker
  在hadoop中每一个tasktracker对应一个org.apache.hadoop.mapred.TaskTracker 类,这个类实现了tasktracker的各种功能。每一个TaskTracker也是组为一个单独的JVM来使用的。在hadoop脚本中对应 bin/hadoop-daemon.sh start jobtracker
  先来看一下TaskTracker的主函数:
  



/**
* Start the TaskTracker, point toward the indicated JobTracker
*/
public static void main(String argv[]) throws Exception {
StringUtils.startupShutdownMessage(TaskTracker.class, argv, LOG);
if (argv.length != 0) {
System.out.println("usage: TaskTracker");
System.exit(-1);
}
try {
JobConf conf=new JobConf();
// enable the server to track time spent waiting on locks
//使用代理机制让服务器等待判断

ReflectionUtils.setContentionTracing
(conf.getBoolean("tasktracker.contention.tracking", false));
DefaultMetricsSystem.initialize("TaskTracker");
TaskTracker tt = new TaskTracker(conf);
MBeans.register("TaskTracker", "TaskTrackerInfo", tt);
//做重要的语句启动 TaskTracker

tt.run();
} catch (Throwable e) {
LOG.error("Can not start task tracker because "+
StringUtils.stringifyException(e));
System.exit(-1);
}
}
  
  在run方法中会启动方法offerService()方法:
  
  



public void run() {
try {
getUserLogManager().start();
startCleanupThreads();
boolean denied = false;
while (running && !shuttingDown && !denied) {
boolean staleState = false;
try {
// This while-loop attempts reconnects if we get network errors
while (running && !staleState && !shuttingDown && !denied) {
try {
State osState = offerService();
if (osState == State.STALE) {
staleState = true;
} else if (osState == State.DENIED) {
denied = true;
}
} catch (Exception ex) {
if (!shuttingDown) {
LOG.info("Lost connection to JobTracker [" +
jobTrackAddr + "].  Retrying...", ex);
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
}
}
}
}
} finally {
close();
}
if (shuttingDown) { return; }
LOG.warn("Reinitializing local state");
initialize();
}
if (denied) {
shutdown();
}
} catch (IOException iex) {
LOG.error("Got fatal exception while reinitializing TaskTracker: " +
StringUtils.stringifyException(iex));
return;
}
catch (InterruptedException i) {
LOG.error("Got interrupted while reinitializing TaskTracker: " +
i.getMessage());
return;
}
}
  
  
  再来看offerService()方法:
  



State offerService() throws Exception {
//得到上次发送心跳的时间;
long lastHeartbeat = System.currentTimeMillis();

while (running && !shuttingDown) {
try {
long now = System.currentTimeMillis();

// accelerate to account for multiple finished tasks up-front
synchronized (finishedCount) {
long remaining =
(lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
while (remaining > 0) {
// sleeps for the wait time or
// until there are *enough* empty slots to schedule tasks

finishedCount.wait(remaining);
+
// Recompute

now = System.currentTimeMillis();
remaining =
(lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
}
// Reset count

finishedCount.set(0);
}

// If the TaskTracker is just starting up:
// 1. Verify the buildVersion
// 2. Get the system directory & filesystem
if(justInited) {
String jobTrackerBV = jobClient.getBuildVersion();
if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
String msg = "Shutting down. Incompatible buildVersion." +
"\nJobTracker's: " + jobTrackerBV +
"\nTaskTracker's: "+ VersionInfo.getBuildVersion();
LOG.error(msg);
try {
jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
} catch(Exception e ) {
LOG.info("Problem reporting to jobtracker: " + e);
}
return State.DENIED;
}

String dir = jobClient.getSystemDir();
if (dir == null) {
throw new IOException("Failed to get system directory");
}
systemDirectory = new Path(dir);
systemFS = systemDirectory.getFileSystem(fConf);
}

// Send the heartbeat and process the jobtracker's directives

HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

// Note the time when the heartbeat returned, use this to decide when to send the
// next heartbeat  

lastHeartbeat = System.currentTimeMillis();


// Check if the map-event list needs purging

Set jobs = heartbeatResponse.getRecoveredJobs();
if (jobs.size() > 0) {
synchronized (this) {
// purge the local map events list
for (JobID job : jobs) {
RunningJob rjob;
synchronized (runningJobs) {
rjob = runningJobs.get(job);         
if (rjob != null) {
synchronized (rjob) {
FetchStatus f = rjob.getFetchStatus();
if (f != null) {
f.reset();
}
}
}
}
}

// Mark the reducers in shuffle for rollback
synchronized (shouldReset) {
for (Map.Entry entry
: runningTasks.entrySet()) {
if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {
this.shouldReset.add(entry.getKey());
}
}
}
}
}

  
  在返回的心跳heartbeatResponse中有很多jobTracker的指令
  



  TaskTrackerAction[] actions = heartbeatResponse.getActions();
if(LOG.isDebugEnabled()) {
LOG.debug("Got heartbeatResponse from JobTracker with responseId: " +
heartbeatResponse.getResponseId() + " and " +
((actions != null) ? actions.length : 0) + " actions");
}
if (reinitTaskTracker(actions)) {
return State.STALE;
}

// resetting heartbeat interval from the response.

heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
justStarted = false;
justInited = false;
  
  下面执行action
  



   if (actions != null){
for(TaskTrackerAction action: actions) {
if (action instanceof LaunchTaskAction) {
添加进队列
addToTaskQueue((LaunchTaskAction)action);
} else if (action instanceof CommitTaskAction) {
CommitTaskAction commitAction = (CommitTaskAction)action;
if (!commitResponses.contains(commitAction.getTaskID())) {
LOG.info("Received commit task action for " +
commitAction.getTaskID());
commitResponses.add(commitAction.getTaskID());
}
} else {
tasksToCleanup.put(action);
}
}
}
markUnresponsiveTasks();
killOverflowingTasks();

//we've cleaned up, resume normal operation
if (!acceptNewTasks && isIdle()) {
acceptNewTasks=true;
}
//The check below may not be required every iteration but we are
//erring on the side of caution here. We have seen many cases where
//the call to jetty's getLocalPort() returns different values at
//different times. Being a real paranoid here.

checkJettyPort(server.getPort());
} catch (InterruptedException ie) {
LOG.info("Interrupted. Closing down.");
return State.INTERRUPTED;
} catch (DiskErrorException de) {
String msg = "Exiting task tracker for disk error:\n" +
StringUtils.stringifyException(de);
LOG.error(msg);
synchronized (this) {
报告错误
jobClient.reportTaskTrackerError(taskTrackerName,
"DiskErrorException", msg);
}
return State.STALE;
} catch (RemoteException re) {
String reClass = re.getClassName();
if (DisallowedTaskTrackerException.class.getName().equals(reClass)) {
LOG.info("Tasktracker disallowed by JobTracker.");
return State.DENIED;
}
} catch (Exception except) {
String msg = "Caught exception: " +
StringUtils.stringifyException(except);
LOG.error(msg);
}
}

return State.NORMAL;
}
  
  TaskTracker每个3秒钟会向JobTracker发送一个心跳。心跳机制采用了RPC代理机制实现通信。关于代理机制其实就是一个远程方法调用的机制,这个大家可以参考其他的资料。
  
  接下来JobTracker接受心跳并向TaskTracker分配任务,当JobTracker接受到心跳的时候会调用方法:heartbeat(TaskTrackerStatus status,boolean initialContact, booleanacceptNewTasks, short responseId)返回值是一个HeartbeatResponse对象:
  



   
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean restarted,
boolean initialContact,
boolean acceptNewTasks,
short responseId)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Got heartbeat from: " + status.getTrackerName() +
" (restarted: " + restarted +
" initialContact: " + initialContact +
" acceptNewTasks: " + acceptNewTasks + ")" +
" with responseId: " + responseId);
}

// Make sure heartbeat is from a tasktracker allowed by the jobtracker.
if (!acceptTaskTracker(status)) {
throw new DisallowedTaskTrackerException(status);
}

// First check if the last heartbeat response got through

String trackerName = status.getTrackerName();
long now = clock.getTime();
if (restarted) {
faultyTrackers.markTrackerHealthy(status.getHost());
} else {
faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);
}

HeartbeatResponse prevHeartbeatResponse =
trackerToHeartbeatResponseMap.get(trackerName);
boolean addRestartInfo = false;

if (initialContact != true) {
// If this isn't the 'initial contact' from the tasktracker,
// there is something seriously wrong if the JobTracker has
// no record of the 'previous heartbeat'; if so, ask the
// tasktracker to re-initialize itself.
if (prevHeartbeatResponse == null) {
// This is the first heartbeat from the old tracker to the newly
// started JobTracker
if (hasRestarted()) {
addRestartInfo = true;
// inform the recovery manager about this tracker joining back

recoveryManager.unMarkTracker(trackerName);
} else {
// Jobtracker might have restarted but no recovery is needed
// otherwise this code should not be reached

LOG.warn("Serious problem, cannot find record of 'previous' " +
"heartbeat for '" + trackerName +
"'; reinitializing the tasktracker");
return new HeartbeatResponse(responseId,
new TaskTrackerAction[] {new ReinitTrackerAction()});
}

} else {

// It is completely safe to not process a 'duplicate' heartbeat from a
// {@link TaskTracker} since it resends the heartbeat when rpcs are
// lost see {@link TaskTracker.transmitHeartbeat()};
// acknowledge it by re-sending the previous response to let the
// {@link TaskTracker} go forward.
if (prevHeartbeatResponse.getResponseId() != responseId) {
LOG.info("Ignoring 'duplicate' heartbeat from '" +
trackerName + "'; resending the previous 'lost' response");
return prevHeartbeatResponse;
}
}
}

// Process this heartbeat
short newResponseId = (short)(responseId + 1);
status.setLastSeen(now);
if (!processHeartbeat(status, initialContact, now)) {
if (prevHeartbeatResponse != null) {
trackerToHeartbeatResponseMap.remove(trackerName);
}
return new HeartbeatResponse(newResponseId,
new TaskTrackerAction[] {new ReinitTrackerAction()});
}

// Initialize the response to be sent for the heartbeat

HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
List actions = new ArrayList();
boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
// Check for new tasks to be executed on the tasktracker
if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
} else {
List tasks = getSetupAndCleanupTasks(taskTrackerStatus);
if (tasks == null ) {
tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
}
if (tasks != null) {
for (Task task : tasks) {
expireLaunchingTasks.addNewTask(task.getTaskID());
if(LOG.isDebugEnabled()) {
LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
}
actions.add(new LaunchTaskAction(task));
}
}
}
}

// kill的任务的队列

List killTasksList = getTasksToKill(trackerName);
if (killTasksList != null) {
actions.addAll(killTasksList);
}

// Check for jobs to be killed/cleanedup

List killJobsList = getJobsForCleanup(trackerName);
if (killJobsList != null) {
actions.addAll(killJobsList);
}

// Check for tasks whose outputs can be saved

List commitTasksList = getTasksToSave(status);
if (commitTasksList != null) {
actions.addAll(commitTasksList);
}

// calculate next heartbeat interval and put in heartbeat response
int nextInterval = getNextHeartbeatInterval();
response.setHeartbeatInterval(nextInterval);
response.setActions(actions.toArray(new TaskTrackerAction[actions.size()]));

// check if the restart info is req
if (addRestartInfo) {
response.setRecoveredJobs(recoveryManager.getJobsToRecover());
}

// Update the trackerToHeartbeatResponseMap

trackerToHeartbeatResponseMap.put(trackerName, response);

// Done processing the hearbeat, now remove 'marked' tasks

removeMarkedTasks(trackerName);

return response;
}

  
  之后启用调度器,默认的调度器为JobQueueTaskScheduler它的assignTasks方法如下:



public synchronized List assignTasks(TaskTracker taskTracker)
throws IOException {
TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();

//得到云端的状态;

ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
final int numTaskTrackers = clusterStatus.getTaskTrackers();
final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();

Collection jobQueue =
jobQueueJobInProgressListener.getJobQueue();

//
// Get map + reduce counts for the current tracker.
//
//得到TaskTracker的状态
final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots();
final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots();
final int trackerRunningMaps = taskTrackerStatus.countMapTasks();
final int trackerRunningReduces = taskTrackerStatus.countReduceTasks();

// 分配任务

List assignedTasks = new ArrayList();

//
// Compute (running + pending) map and reduce task numbers across pool
//

int remainingReduceLoad = 0;
int remainingMapLoad = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
if (job.scheduleReduces()) {
remainingReduceLoad +=
(job.desiredReduces() - job.finishedReduces());
}
}
}
}

// Compute the 'load factor' for maps and reduces
double mapLoadFactor = 0.0;
if (clusterMapCapacity > 0) {
mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
}
double reduceLoadFactor = 0.0;
if (clusterReduceCapacity > 0) {
reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
}


final int trackerCurrentMapCapacity =
Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),
trackerMapCapacity);
int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
boolean exceededMapPadding = false;
if (availableMapSlots > 0) {
exceededMapPadding =
exceededPadding(true, clusterStatus, trackerMapCapacity);
}

int numLocalMaps = 0;
int numNonLocalMaps = 0;
scheduleMaps:
for (int i=0; i < availableMapSlots; ++i) {
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}

Task t = null;

// Try to schedule a node-local or rack-local Map task

t =
job.obtainNewLocalMapTask(taskTrackerStatus, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
assignedTasks.add(t);
++numLocalMaps;

// Don't assign map tasks to the hilt!
// Leave some free slots in the cluster for future task-failures,
// speculative tasks etc. beyond the highest priority job
if (exceededMapPadding) {
break scheduleMaps;
}

// Try all jobs again for the next Map task
break;
}

// Try to schedule a node-local or rack-local Map task

t =
job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());

if (t != null) {
assignedTasks.add(t);
++numNonLocalMaps;

// We assign at most 1 off-switch or speculative task
// This is to prevent TaskTrackers from stealing local-tasks
// from other TaskTrackers.
break scheduleMaps;
}
}
}
}
int assignedMaps = assignedTasks.size();

//
// Same thing, but for reduce tasks
// However we _never_ assign more than 1 reduce task per heartbeat
//

final int trackerCurrentReduceCapacity =
Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity),
trackerReduceCapacity);
final int availableReduceSlots =
Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
boolean exceededReducePadding = false;
if (availableReduceSlots > 0) {
exceededReducePadding = exceededPadding(false, clusterStatus,
trackerReduceCapacity);
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING ||
job.numReduceTasks == 0) {
continue;
}

Task t =
job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts()
);
if (t != null) {
assignedTasks.add(t);
break;
}

// Don't assign reduce tasks to the hilt!
// Leave some free slots in the cluster for future task-failures,
// speculative tasks etc. beyond the highest priority job
if (exceededReducePadding) {
break;
}
}
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " +
"[" + mapLoadFactor + ", " + trackerMapCapacity + ", " +
trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" +
(trackerCurrentMapCapacity - trackerRunningMaps) + ", " +
assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps +
")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " +
trackerCurrentReduceCapacity + "," + trackerRunningReduces +
"] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) +
", " + (assignedTasks.size()-assignedMaps) + "]");
}

return assignedTasks;
}
  
  
  当
JobTracker接受到heartbeat后,如果JobTracker返回的response含有分配好的任务
LaunchAction,TaskTracker则addToTaskQueue方法,将其加入TaskTracker类中MapLauncher或者
ReduceLauncher对象的taskToLauncher的队列中。MapLauncher和ReduceLauncher对象均为
TaskLauncher类的实例,它是TaskTracker类的一个内部类,具有一个数据成员,是
TaskTracker.TaskInProgress类型的队列,如果应答包中包含的任务是map
task则放入mapLauncher队列,如果是reduce task则放入reduceLauncher的taskToLaunch队列:



private void addToTaskQueue(LaunchTaskAction action) {
if (action.getTask().isMapTask()) {
mapLauncher.addToTaskQueue(action);
} else {
reduceLauncher.addToTaskQueue(action);
}
}

  
  
  注册动作



private TaskInProgress registerTask(LaunchTaskAction action,
TaskLauncher launcher) {
Task t = action.getTask();
LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +
" task's state:" + t.getState());
TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
synchronized (this) {
tasks.put(t.getTaskID(), tip);
runningTasks.put(t.getTaskID(), tip);
boolean isMap = t.isMapTask();
if (isMap) {
mapTotal++;
} else {
reduceTotal++;
}
}
return tip;
}


private Path localizeJobConfFile(Path jobFile, String user,
FileSystem userFs, JobID jobId)
throws IOException {
// Get sizes of JobFile and JarFile
// sizes are -1 if they are not present.

FileStatus status = null;
long jobFileSize = -1;
try {
status = userFs.getFileStatus(jobFile);
jobFileSize = status.getLen();
} catch(FileNotFoundException fe) {
jobFileSize = -1;
}
Path localJobFile =
lDirAlloc.getLocalPathForWrite(getPrivateDirJobConfFile(user,
jobId.toString()), jobFileSize, fConf);

// Download job.xml

userFs.copyToLocalFile(jobFile, localJobFile);
return localJobFile;
}
  
  
  这个方法进行了一些列的本地化的处理,将jar包,split文件以及xml文件拷贝到本地。当所有的task运行所需要的资源都拷贝到本地后,则调用TaskTracker的launchTaskForJob方法,其又调用TaskTracker.TaskInProgress的launchTask函数:



/**
* Kick off the task execution
*/
public synchronized void launchTask(RunningJob rjob) throws IOException {
if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
localizeTask(task);
if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
this.taskStatus.setRunState(TaskStatus.State.RUNNING);
}
setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));
this.runner.start();
long now = System.currentTimeMillis();
this.taskStatus.setStartTime(now);
this.lastProgressReport = now;
} else {
LOG.info("Not launching task: " + task.getTaskID() +
" since it's state is " + this.taskStatus.getRunState());
}
}
之后就创建TaskRunner对象,运行任务。
public final void run() {
String errorInfo = "Child Error";
try {

//before preparing the job localize
//all the archives

TaskAttemptID taskid = t.getTaskID();
final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
//simply get the location of the workDir and pass it to the child. The
//child will do the actual dir creation
final File workDir =
new File(new Path(localdirs[rand.nextInt(localdirs.length)],
TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(),
taskid.toString(),
t.isTaskCleanupTask())).toString());

String user = tip.getUGI().getUserName();



if (!prepare()) {
return;
}

// Accumulates class paths for child.

List classPaths = getClassPaths(conf, workDir,
taskDistributedCacheManager);

long logSize = TaskLog.getTaskLogLength(conf);

//  Build exec child JVM args.

Vector vargs = getVMArgs(taskid, workDir, classPaths, logSize);

tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);

// set memory limit using ulimit if feasible and necessary ...

String setup = getVMSetupCmd();
// Set up the redirection of the task's stdout and stderr streams

File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
File stdout = logFiles[0];
File stderr = logFiles[1];
tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,
stderr);

Map env = new HashMap();
errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid,
logSize);

// flatten the env as a set of export commands

List  setupCmds = new ArrayList();
for(Entry entry : env.entrySet()) {
StringBuffer sb = new StringBuffer();
sb.append("export ");
sb.append(entry.getKey());
sb.append("=\"");
sb.append(entry.getValue());
sb.append("\"");
setupCmds.add(sb.toString());
}
setupCmds.add(setup);

launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);
tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
if (exitCodeSet) {
if (!killed && exitCode != 0) {
if (exitCode == 65) {
tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
}
throw new IOException("Task process exit with nonzero status of " +
exitCode + ".");
}
}
} catch (FSError e) {
LOG.fatal("FSError", e);
try {
tracker.fsErrorInternal(t.getTaskID(), e.getMessage());
} catch (IOException ie) {
LOG.fatal(t.getTaskID()+" reporting FSError", ie);
}
} catch (Throwable throwable) {
LOG.warn(t.getTaskID() + " : " + errorInfo, throwable);
Throwable causeThrowable = new Throwable(errorInfo, throwable);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
causeThrowable.printStackTrace(new PrintStream(baos));
try {
tracker.reportDiagnosticInfoInternal(t.getTaskID(), baos.toString());
} catch (IOException e) {
LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
}
} finally {

// It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
// *false* since the task has either
// a) SUCCEEDED - which means commit has been done
// b) FAILED - which means we do not need to commit

tip.reportTaskFinished(false);
}
  
  
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-86360-1-1.html 上篇帖子: hadoop安装遇到的各种异常及解决办法 下篇帖子: 第三章:Hadoop简介及配置Hadoop-1.2.1,hbase-0.94.13集群
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表