设为首页 收藏本站
查看: 750|回复: 0

[经验分享] Hadoop的mapred JobTracker端源码概览

[复制链接]

尚未签到

发表于 2016-12-10 10:29:11 | 显示全部楼层 |阅读模式
上一节看到TaskTracker启动新任务的过程,这里接着看看在JobTracker中是怎样响应和调度的,在hadoop中,我们看到采用的是pull的方式拿到任务。
   
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
justStarted, askForNewTask,
heartbeatResponseId);

这里是TaskTracker想JobTracker发送heartbeat的地方 - 使用的是RPC,这样我们你就来到JobTracker了:
  
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean initialContact, boolean acceptNewTasks, short responseId)
throws IOException {
.............
//如果是接受新任务的话,让JotTracker去进行调度,这里会调用taskScheduler的assignTasks
if (acceptNewTasks) {
TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
} else {
List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
//这里是准备assignTask的地方,由配置的调度器来决定怎样调度
if (tasks == null ) {
tasks = taskScheduler.assignTasks(taskTrackerStatus);
}
if (tasks != null) {
for (Task task : tasks) {
expireLaunchingTasks.addNewTask(task.getTaskID());
LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
actions.add(new LaunchTaskAction(task));
}
}
}
}

这个taskScheduler采用的是默认的   
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass,conf);
这是在配置文件中指定的,"mapred.jobtracker.taskScheduler",常常是JobQueueTaskScheduler是hadoop的实现,FIFO类型的调度器,让我们看看这个调度器是怎样assignTasks的:
  
public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
throws IOException {
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
int numTaskTrackers = clusterStatus.getTaskTrackers();
Collection<JobInProgress> jobQueue =
jobQueueJobInProgressListener.getJobQueue();
//
// Get map + reduce counts for the current tracker.
//
int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
int numMaps = taskTracker.countMapTasks();
int numReduces = taskTracker.countReduceTasks();
//
// Compute average map and reduce task numbers across pool
//
int remainingReduceLoad = 0;
int remainingMapLoad = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
int totalMapTasks = job.desiredMaps();
int totalReduceTasks = job.desiredReduces();
remainingMapLoad += (totalMapTasks - job.finishedMaps());
remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
}
}
}
// find out the maximum number of maps or reduces that we are willing
// to run on any node.
int maxMapLoad = 0;
int maxReduceLoad = 0;
if (numTaskTrackers > 0) {
maxMapLoad = Math.min(maxCurrentMapTasks,
(int) Math.ceil((double) remainingMapLoad /
numTaskTrackers));
maxReduceLoad = Math.min(maxCurrentReduceTasks,
(int) Math.ceil((double) remainingReduceLoad
/ numTaskTrackers));
}
int totalMaps = clusterStatus.getMapTasks();
int totalMapTaskCapacity = clusterStatus.getMaxMapTasks();
int totalReduces = clusterStatus.getReduceTasks();
int totalReduceTaskCapacity = clusterStatus.getMaxReduceTasks();
//
// In the below steps, we allocate first a map task (if appropriate),
// and then a reduce task if appropriate.  We go through all jobs
// in order of job arrival; jobs only get serviced if their
// predecessors are serviced, too.
//
//
// We hand a task to the current taskTracker if the given machine
// has a workload that's less than the maximum load of that kind of
// task.
//
if (numMaps < maxMapLoad) {
int totalNeededMaps = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
//这里是取得Task的地方,需要到job中去取
Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
return Collections.singletonList(t);
}
//
// Beyond the highest-priority task, reserve a little
// room for failures and speculative executions; don't
// schedule tasks to the hilt.
//
totalNeededMaps += job.desiredMaps();
int padding = 0;
if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
padding = Math.min(maxCurrentMapTasks,
(int)(totalNeededMaps * padFraction));
}
if (totalMaps + padding >= totalMapTaskCapacity) {
break;
}
}
}
}
//
// Same thing, but for reduce tasks
//
if (numReduces < maxReduceLoad) {
int totalNeededReduces = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING ||
job.numReduceTasks == 0) {
continue;
}
Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
return Collections.singletonList(t);
}
//
// Beyond the highest-priority task, reserve a little
// room for failures and speculative executions; don't
// schedule tasks to the hilt.
//
totalNeededReduces += job.desiredReduces();
int padding = 0;
if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
padding =
Math.min(maxCurrentReduceTasks,
(int) (totalNeededReduces * padFraction));
}
if (totalReduces + padding >= totalReduceTaskCapacity) {
break;
}
}
}
}
return null;
}

task的取得就要到JobInProgress中去obtainNewReduceTask了,需要对集群的状态进行查询处理了。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312272-1-1.html 上篇帖子: hadoop升级之fsck命令迎战miss block警告 下篇帖子: hadoop实现简单的倒排索引
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表