hadoop作业调度-资料
file:///C:/DOCUME%7E1/ADMINI%7E1/LOCALS%7E1/Temp/EverNoteTempDir/ContractedBlock.giffile:///C:/DOCUME%7E1/ADMINI%7E1/LOCALS%7E1/Temp/EverNoteTempDir/ExpandedBlockStart.gifCode public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,boolean restarted, boolean initialContact,
boolean acceptNewTasks, short responseId) throws IOException {
file:///C:/DOCUME%7E1/ADMINI%7E1/LOCALS%7E1/Temp/EverNoteTempDir/dot.gif
// 如果当次心跳允许指派任务,且当前上报心跳的taskTracker可接收新task(传入的acceptNewTasks为true)
//, 并且其不在黑名单中, 则进入调度
if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
} else {
List tasks = getSetupAndCleanupTasks(taskTrackerStatus);
if (tasks == null) {
// 调用具体的taskScheduler实现来为当前taskTracker指派任务
tasks = taskScheduler.assignTasks(taskTrackerStatus);
}
if (tasks != null) {
for (Task task : tasks) {
expireLaunchingTasks.addNewTask(task.getTaskID());
LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
actions.add(new LaunchTaskAction(task));
}
}
}
}
...
}
页:
[1]