在本节中,我们特别来学习一些有心跳(HeartBeat)的相关知识,这包括用途、心跳的发送、接收和应答。
JobTracker和TaskTracker之间是通过心跳来进行信息沟通的,TaskTracker通过周期性的通过心跳向JobTracker汇报该节点和任务的状态。心跳实际上就是一个RPC函数,在Hadoop中,心跳主要有三个作用:
1)、判断TaskTracker是否还活着;
2)、JobTracker及时获得各个TaskTracker节点上资源的使用情况和任务运行状态;
3)、给TaskTracker分配任务。
那么,心跳是由谁发起的呢?JobTracker从不会主动的向TaskTracker发送任何的信息,而是由TaskTracker节点主动通过心跳来向JobTracker获取属于自己的信息,JobTracker只能通过心跳应答的形式为各个TaskTracker分配任务。
TaskTracker周期性的调用RPC函数hearbeat()向JobTracker汇报信息和领取任务,该函数定义如下:
/**
* The periodic heartbeat mechanism between the {@link TaskTracker} and
* the {@link JobTracker}.
*
* The {@link JobTracker} processes the status information sent by the
* {@link TaskTracker} and responds with instructions to start/stop
* tasks or jobs, and also 'reset' instructions during contingencies.
* @Param status: 封装了TaskTracker上的各种状态信息
* @Param restarted: TaskTracker是否刚启动
* @Param initialContact: TaskTracker是否第一次连接JobTracker
* @Param acceptNewTasks: 是否接收新任务
* @Param responseId: 心跳响应编号,用于防止重复发送心跳,每接受一次心跳后,该值都加1
*/
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean restarted,
boolean initialContact,
boolean acceptNewTasks,
short responseId) throws IOException
TaskTracker发送心跳后,会领取JobTracker给TaskTracker下达的一些命令信息即HeartbeatResponse对象。在该方法内部,主要涉及两个业务逻辑:更新状态和下达命令。JobTracker首先将TaskTracker汇报的该接点的最新的运行状态做更新,然后根据这些状态和其他需求给TaskTracker下相应的操作命令。
下面我们主要看看JobTracker给TaskTracker下达的封装在HeartbeatResponse对象中都主要有什么信息? HeartbeatResponse对象主要有两部分的信息
1)、下达给TaskTracker的命令。
JobTracker将给TaskTracker的命令封装成TaskTrackerAction类,我们来看一看该类中主要相关信息:
abstract class TaskTrackerAction implements Writable {
/**
* 命令类型
* Ennumeration of various 'actions' that the {@link JobTracker}
* directs the {@link TaskTracker} to perform periodically.
*
*/
public static enum ActionType {
/** 运行新任务Launch a new task. */
LAUNCH_TASK,
/** 杀死任务Kill a task. */
KILL_TASK,
/** 杀死作业Kill any tasks of this job and cleanup. */
KILL_JOB,
/** 重新初始化Reinitialize the tasktracker. */
REINIT_TRACKER,
/** 提交任务Ask a task to save its output. */
COMMIT_TASK
};
/**
* A factory-method to create objects of given {@link ActionType}.
* @param actionType the {@link ActionType} of object to create.
* @return an object of {@link ActionType}.
*/
public static TaskTrackerAction createAction(ActionType actionType) {
TaskTrackerAction action = null;
switch (actionType) {
case LAUNCH_TASK:
{
action = new LaunchTaskAction();
}
break;
case KILL_TASK:
{
action = new KillTaskAction();
}
break;
case KILL_JOB:
{
action = new KillJobAction();
}
break;
case REINIT_TRACKER:
{
action = new ReinitTrackerAction();
}
break;
case COMMIT_TASK:
{
action = new CommitTaskAction();
}
break;
}
return action;
}
...
}