设为首页 收藏本站
查看: 878|回复: 0

[经验分享] Hadoop源码解读-JobTracker处理HeartBeat

[复制链接]

尚未签到

发表于 2016-12-10 10:33:08 | 显示全部楼层 |阅读模式
JobTracker会接受TaskTracker的心跳,并处理。不多说,直接上源码

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean restarted,
boolean initialContact,
boolean acceptNewTasks,
short responseId)


1 首先检查heartbeat是否来自自己的host列表,否则抛出异常。
  如果不再Host列表或者在排除Host列表中,退出心跳处理。
  
return (inHostsList(status) && !inExcludedHostsList(status));

2 判断是否在黑名单、灰名单、默认名单,并从这些名单中删除
  黑名单、灰名单主要是Hadoop的容错机制,在此不做过多解释,可以单写一篇文章。
  
faultyTrackers.markTrackerHealthy(status.getHost());

3 根据trackerName获取上一个heartbeat response
  
HeartbeatResponse prevHeartbeatResponse =
trackerToHeartbeatResponseMap.get(trackerName);

4 如果上一个heartbeat 为null,让Tasktracker重新初始化  如果是第一个 response 从recoveryMap中移除
   
if (prevHeartbeatResponse == null) {
// This is the first heartbeat from the old tracker to the newly
// started JobTracker
if (hasRestarted()) {
addRestartInfo = true;
// inform the recovery manager about this tracker joining back
recoveryManager.unMarkTracker(trackerName);
} else {
// Jobtracker might have restarted but no recovery is needed
// otherwise this code should not be reached
LOG.warn("Serious problem, cannot find record of 'previous' " +
"heartbeat for '" + trackerName +
"'; reinitializing the tasktracker");
return new HeartbeatResponse(responseId,
new TaskTrackerAction[] {new ReinitTrackerAction()});
}
}

   如果重发的responseId,丢弃掉。
   
if (prevHeartbeatResponse.getResponseId() != responseId) {
LOG.info("Ignoring 'duplicate' heartbeat from '" +
trackerName + "'; resending the previous 'lost' response");
return prevHeartbeatResponse;
}

5 处理heartbeat  首先 updateTaskTrackerStatus 如果是被遗忘的tasktracker 加入队列中;更新任务状态;更新健康节点状态;
   
private synchronized boolean processHeartbeat(
TaskTrackerStatus trackerStatus,
boolean initialContact,
long timeStamp) throws UnknownHostException {
//主要集中在此不分析那么详细了
}

6 检查新Task是否执行,如果没有执行,加入执行队列
   
if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
} else {
List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
if (tasks == null ) {
tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
}
//添加Task
if (tasks != null) {
for (Task task : tasks) {
expireLaunchingTasks.addNewTask(task.getTaskID());
if(LOG.isDebugEnabled()) {
LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
}
actions.add(new LaunchTaskAction(task));
}
}
}
}

7 检查Task是否杀死
  List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
if (killTasksList != null) {
actions.addAll(killTasksList);
}

8 检查 task 是否cleanup
  List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);
if (killJobsList != null) {
actions.addAll(killJobsList);
}
9 检查task 的output是否可以save
   
List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
if (commitTasksList != null) {
actions.addAll(commitTasksList);
}

10 计算下一次heartbeat的时间间隔
   
int nextInterval = getNextHeartbeatInterval();
response.setHeartbeatInterval(nextInterval);
response.setActions(
actions.toArray(new TaskTrackerAction[actions.size()]));

11 更新heartbeatMap,并remove掉Marked已经处理掉的heartbeat
   
// 更新Map
trackerToHeartbeatResponseMap.put(trackerName, response);
//清除处理完成的心跳
removeMarkedTasks(trackerName);


  不对之处欢迎讨论。
=================参考====
hadoop源码。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312276-1-1.html 上篇帖子: Hadoop中map数的计算 下篇帖子: 转载--淘宝hadoop升级遇到的问题
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表