设为首页 收藏本站
查看: 1138|回复: 0

[经验分享] Hadoop深入学习:MapReduce中的心跳机制

[复制链接]

尚未签到

发表于 2016-12-10 10:42:31 | 显示全部楼层 |阅读模式
在本节中,我们特别来学习一些有心跳(HeartBeat)的相关知识,这包括用途、心跳的发送、接收和应答。
        JobTracker和TaskTracker之间是通过心跳来进行信息沟通的,TaskTracker通过周期性的通过心跳向JobTracker汇报该节点和任务的状态。心跳实际上就是一个RPC函数,在Hadoop中,心跳主要有三个作用:
        1)、判断TaskTracker是否还活着;
        2)、JobTracker及时获得各个TaskTracker节点上资源的使用情况和任务运行状态;
        3)、给TaskTracker分配任务。
        那么,心跳是由谁发起的呢?JobTracker从不会主动的向TaskTracker发送任何的信息,而是由TaskTracker节点主动通过心跳来向JobTracker获取属于自己的信息,JobTracker只能通过心跳应答的形式为各个TaskTracker分配任务。
        TaskTracker周期性的调用RPC函数hearbeat()向JobTracker汇报信息和领取任务,该函数定义如下:
/**
* The periodic heartbeat mechanism between the {@link TaskTracker} and
* the {@link JobTracker}.
*
* The {@link JobTracker} processes the status information sent by the
* {@link TaskTracker} and responds with instructions to start/stop
* tasks or jobs, and also 'reset' instructions during contingencies.
* @Param status: 封装了TaskTracker上的各种状态信息
* @Param restarted: TaskTracker是否刚启动
* @Param initialContact: TaskTracker是否第一次连接JobTracker
* @Param acceptNewTasks: 是否接收新任务
* @Param responseId: 心跳响应编号,用于防止重复发送心跳,每接受一次心跳后,该值都加1
*/
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean restarted,
boolean initialContact,
boolean acceptNewTasks,
short responseId) throws IOException

        TaskTracker发送心跳后,会领取JobTracker给TaskTracker下达的一些命令信息即HeartbeatResponse对象。在该方法内部,主要涉及两个业务逻辑:更新状态和下达命令。JobTracker首先将TaskTracker汇报的该接点的最新的运行状态做更新,然后根据这些状态和其他需求给TaskTracker下相应的操作命令。
        下面我们主要看看JobTracker给TaskTracker下达的封装在HeartbeatResponse对象中都主要有什么信息? HeartbeatResponse对象主要有两部分的信息
        1)、下达给TaskTracker的命令。
        JobTracker将给TaskTracker的命令封装成TaskTrackerAction类,我们来看一看该类中主要相关信息:
abstract class TaskTrackerAction implements Writable {
/**
* 命令类型
* Ennumeration of various 'actions' that the {@link JobTracker}
* directs the {@link TaskTracker} to perform periodically.
*
*/
public static enum ActionType {
/** 运行新任务Launch a new task. */
LAUNCH_TASK,
/** 杀死任务Kill a task. */
KILL_TASK,
/** 杀死作业Kill any tasks of this job and cleanup. */
KILL_JOB,
/** 重新初始化Reinitialize the tasktracker. */
REINIT_TRACKER,
/** 提交任务Ask a task to save its output. */
COMMIT_TASK
};
/**
* A factory-method to create objects of given {@link ActionType}.
* @param actionType the {@link ActionType} of object to create.
* @return an object of {@link ActionType}.
*/
public static TaskTrackerAction createAction(ActionType actionType) {
TaskTrackerAction action = null;
switch (actionType) {
case LAUNCH_TASK:
{
action = new LaunchTaskAction();
}
break;
case KILL_TASK:
{
action = new KillTaskAction();
}
break;
case KILL_JOB:
{
action = new KillJobAction();
}
break;
case REINIT_TRACKER:
{
action = new ReinitTrackerAction();
}
break;
case COMMIT_TASK:
{
action = new CommitTaskAction();
}
break;
}
return action;
}
...
}

        我们再来看一下这几个命令:
        (1)、ReinitTrackerAction
        JobTracker收到TaskTracker发送的心跳后,先检查一致性,如果发现有异常则会要求TaskTracker重新对自己进行初始化操作,已恢复到一致状态。这些以执行包括丢失上次心跳应答信息和丢失TaskTracker的状态信息。
        (2)、LaunchTaskAction
        LaunchTaskAction封装了JobTracker给TaskTracker分配的新任务。
        (3)、KillTaskAction
        KillTaskAction封装了JobTracker需要杀死的任务。JobTracker接到该命令后会杀死对应的任务、清理工作目录和释放任务占有的slot资源。
        (4)、KillJobAction
        KillJobAction封装了JobTracker待清理的作业。
        (5)、CommitTaskAction
        CommitTaskAction类封装了JobTracker需要提交的任务。为了防止同一个TaskInProgress的两个同时运行的Task Attempt(推测式执行)同时向一个文件写数据而发生冲突,Hadoop让每一个Task Attempt写到${mapred.output.dir}/_temporary下的一个单独的文件中,当某个Task Attempt运行完成后,再将运行结果移到最终的输出目录${mapred.output.dir}中。
        Hadoop讲一个成功运行完成的Task Attempt的结果文件从临时目录移动到最终输出目录的过程,叫“任务提交”。
         2)、下次汇报心跳的时间。
         TaskTracker的心跳发送时间是由JobTracker决定的,在心跳应答中下达给TaskTracker,心跳之间的时间间隔并不是固定不变的,会随着集群规模而动态调整。
         在MapReduce中,只有JobTracker直到某一时刻的集群的规模,因此由JobTracker为每一个TaskTracker计算下一次的心跳汇报时间,并通过心跳机制告诉TaskTracker。心跳间隔应该大小适度:如果太小则JobTracker需要处理更高的并发心跳连接请求,这必然会给JobTracker找出不晓得并发压力,如果太大,则空闲资源不能被充分利用,进而降低系统吞吐率。
         JobTracker允许通过参数来设置心跳时间的加速比,即通过mapred.heartbeats.in.second和mapreduce.jobtracker.heartbeats.scaling.factor两个参数来调整心跳时间间隔时间。同时为了防止用户参数设置不合理而对JobTracker产生较大负载,间隔时间至少为3秒。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312291-1-1.html 上篇帖子: hadoop源码研读之路(四)----IPC.RPC 下篇帖子: [实验]hadoop例子 trackinfo数据清洗
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表