设为首页 收藏本站
查看: 1332|回复: 0

[经验分享] hadoop源码TaskAttemptID TaskTrackerAction JobTracker,FileOutputCommitter...

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2014-6-6 12:10:57 | 显示全部楼层 |阅读模式
hadoop源码TaskAttemptID TaskTrackerAction JobTracker,FileOutputCommitter相关
1,TaskAttemptID代表task attempt,一个task attempt就是一个map/reduce task 的一个实例taskid,而每个TaskAttemptID由两部分组成:TaskID+task序列号
eg:

attempt_200707121733_0003_m_000005_0

代表2007年07月12日17点33分启动的第0003号作业(job)的第00005号map任务的第0号task attempt
2,TaskTrackerAction的类型有,LaunchTaskAction(),KillTaskAction(),KillJobAction(),ReinitTrackerAction(),CommitTaskAction();
/**
   * Ennumeration of various 'actions' that the {@link JobTracker}
   * directs the {@link TaskTracker} to perform periodically.
   *
   */
  public static enum ActionType {
    /** Launch a new task. */
    LAUNCH_TASK,
   
    /** Kill a task. */
    KILL_TASK,
   
    /** Kill any tasks of this job and cleanup. */
    KILL_JOB,
   
    /** Reinitialize the tasktracker. */
    REINIT_TRACKER,

    /** Ask a task to save its output. */
    COMMIT_TASK
  };
  
  /**
   * A factory-method to create objects of given {@link ActionType}.
   * @param actionType the {@link ActionType} of object to create.
   * @return an object of {@link ActionType}.
   */
  public static TaskTrackerAction createAction(ActionType actionType) {
    TaskTrackerAction action = null;
   
    switch (actionType) {
    case LAUNCH_TASK:
      {
        action = new LaunchTaskAction();
      }
      break;
    case KILL_TASK:
      {
        action = new KillTaskAction();
      }
      break;
    case KILL_JOB:
      {
        action = new KillJobAction();
      }
      break;
    case REINIT_TRACKER:
      {
        action = new ReinitTrackerAction();
      }
      break;
    case COMMIT_TASK:
      {
        action = new CommitTaskAction();
      }
      break;
    }

    return action;
  }
3,JobTracker中相关:
           offerService()
                   |--taskScheduler.start();
                   |--recoveryManager.recover();
通过RecoveryManager的recover()方法恢复Job

                   |-- this.expireTrackersThread.start();
检测已经失效的TaskTracker节点。

// Used to expire TaskTrackers that have gone down

                   |-- this.retireJobsThread.start();
清除那些已经完成很长时间仍然存在队列中的job

                   |--expireLaunchingTaskThread.start();
停止那些在超时时间内未报告进度的Tasks。

A thread to timeout tasks that have been assigned to task trackers,
   * but that haven't reported back yet.

                   |--completedJobsStoreThread.start();
                   |--this.interTrackerServer.start();/ start the inter-tracker server once the jt is ready
  

  /**
   * Run forever
   */
  public void offerService() throws InterruptedException, IOException {
    // Prepare for recovery. This is done irrespective of the status of restart
    // flag.
    while (true) {
      try {
        recoveryManager.updateRestartCount();
        break;
      } catch (IOException ioe) {
        LOG.warn("Failed to initialize recovery manager. ", ioe);
        // wait for some time
        Thread.sleep(FS_ACCESS_RETRY_PERIOD);
        LOG.warn("Retrying...");
      }
    }

    taskScheduler.start();
   
    //  Start the recovery after starting the scheduler
    try {
      recoveryManager.recover();
    } catch (Throwable t) {
      LOG.warn("Recovery manager crashed! Ignoring.", t);
    }
   
    this.expireTrackersThread = new Thread(this.expireTrackers,
                                          "expireTrackers");
    this.expireTrackersThread.start();
    this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
    this.retireJobsThread.start();
    expireLaunchingTaskThread.start();

    if (completedJobStatusStore.isActive()) {
      completedJobsStoreThread = new Thread(completedJobStatusStore,
                                            "completedjobsStore-housekeeper");
      completedJobsStoreThread.start();
    }

    // start the inter-tracker server once the jt is ready
    this.interTrackerServer.start();
   
    synchronized (this) {
      state = State.RUNNING;
    }

4, Job的五种状态
public static final int RUNNING = 1;
  public static final int SUCCEEDED = 2;
  public static final int FAILED = 3;
  public static final int PREP = 4;

  public static final int KILLED = 5;

5,jobStatus信息:
  this.jobid = jobid;
     this.setupProgress = setupProgress;
     this.mapProgress = mapProgress;
     this.reduceProgress = reduceProgress;
     this.cleanupProgress = cleanupProgress;
     this.runState = runState;
6,FileOutputCommitter相关
setupJob:Hadoop初始化时设置job的输出;
commitJob:当job完成时,清除job的输出,这个方法在反馈回来的job状态为SUCCEEDED时调用;
cleanupJob:job结束后清除job的输出;
// do the clean up of temporary directory
abortJob:当job的返回状态是FAILED或KILLED时,执行该函数,用于终止作业的输出;
setupTask:设置task的输出;
// FileOutputCommitter's setupTask doesn't do anything. Because the
       // temporary task directory is created on demand when the
       // task is writing.
needsTaskCommit:检测task是否需要提交;
commitTask:将task的输出移到作业的输出目录;
// Move the task outputs to their final place
// Delete the temporary task-specific output directory
abortTask:取消task的输出;



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-20289-1-1.html 上篇帖子: MapReduce错误任务失败处理 下篇帖子: hadoop源码之JobQueueTaskScheduler
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表