设为首页 收藏本站
查看: 1010|回复: 0

Hadoop之TaskTraker分析

[复制链接]

尚未签到

发表于 2015-11-11 12:28:16 | 显示全部楼层 |阅读模式
  TaskTracker的工作职责之前已经和大家提过,主要负责维护,申请和监控Task,通过heartbeat和JobTracker进行通信。
  TaskTracker的init过程:
  1.读取配置文件,解析参数
  2.将TaskTraker上原有的用户local files删除并新建新的dir和file
  3. Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>(); 清除map
  4.    this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();记录task的链表

            this.runningJobs = new TreeMap<JobID, RunningJob>();记录job的id信息
  5.初始化JVMManager:  mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(),
true, tracker);
reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
false, tracker);     6.初始化RPC,获取JobTracker client用于heartbeat通信;
  7.new一个 后台线程用于监听map完成的事件

  
this.mapEventsFetcher = new MapEventsFetcherThread();
mapEventsFetcher.setDaemon(true);
mapEventsFetcher.setName(
&quot;Map-events fetcher for all reduce tasks &quot; + &quot;on &quot; +
taskTrackerName);
mapEventsFetcher.start();
    后台线程的run方法如下:
   while (running) {
try {
List <FetchStatus> fList = null;
synchronized (runningJobs) {
while (((fList = reducesInShuffle()).size()) == 0) {
try {
runningJobs.wait();
} catch (InterruptedException e) {
LOG.info(&quot;Shutting down: &quot; + this.getName());
return;
}
}
}
// now fetch all the map task events for all the reduce tasks
// possibly belonging to different jobs
boolean fetchAgain = false; //flag signifying whether we want to fetch
//immediately again.
for (FetchStatus f : fList) {
long currentTime = System.currentTimeMillis();
try {
//the method below will return true when we have not
//fetched all available events yet
if (f.fetchMapCompletionEvents(currentTime)) {
fetchAgain = true;
}
} catch (Exception e) {
LOG.warn(
&quot;Ignoring exception that fetch for map completion&quot; +
&quot; events threw for &quot; + f.jobId + &quot; threw: &quot; +
StringUtils.stringifyException(e));
}
if (!running) {
break;
}
}
synchronized (waitingOn) {
try {
if (!fetchAgain) {
waitingOn.wait(heartbeatInterval);
}
} catch (InterruptedException ie) {
LOG.info(&quot;Shutting down: &quot; + this.getName());
return;
}
}
} catch (Exception e) {
LOG.info(&quot;Ignoring exception &quot;  + e.getMessage());
}
}
} 8.initializeMemoryManagement,初始化每个TrackTask的内存设置
  9.new一个Map和Reducer的Launcher后台线程
     mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
mapLauncher.start();
reduceLauncher.start();用于后面创建子JVM来执行map、reduce task
  看一下TaskLauncher的run方法:
//before preparing the job localize
      //all the archives
      TaskAttemptID taskid = t.getTaskID();
      final LocalDirAllocator lDirAlloc = new LocalDirAllocator(&quot;mapred.local.dir&quot;);
      //simply get the location of the workDir and pass it to the child. The
      //child will do the actual dir creation
      final File workDir =
      new File(new Path(localdirs[rand.nextInt(localdirs.length)],
          TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(),
          taskid.toString(),
          t.isTaskCleanupTask())).toString());
      String user = tip.getUGI().getUserName();
      // Set up the child task's configuration. After this call, no localization
      // of files should happen in the TaskTracker's process space. Any changes to
      // the conf object after this will NOT be reflected to the child.
      // setupChildTaskConfiguration(lDirAlloc);
      if (!prepare()) {
        return;
      }
      // Accumulates class paths for child.
      List<String> classPaths = getClassPaths(conf, workDir,
                                              taskDistributedCacheManager);
      long logSize = TaskLog.getTaskLogLength(conf);
      //  Build exec child JVM args.
      Vector<String> vargs = getVMArgs(taskid, workDir, classPaths, logSize);
      tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
      // set memory limit using ulimit if feasible and necessary ...
      String setup = getVMSetupCmd();
      // Set up the redirection of the task's stdout and stderr streams
      File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
      File stdout = logFiles[0];
      File stderr = logFiles[1];
      tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,
                 stderr);
      Map<String, String> env = new HashMap<String, String>();
      errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid,
                                   logSize);
      // flatten the env as a set of export commands
      List <String> setupCmds = new ArrayList<String>();
      for(Entry<String, String> entry : env.entrySet()) {
        StringBuffer sb = new StringBuffer();
        sb.append(&quot;export &quot;);
        sb.append(entry.getKey());
        sb.append(&quot;=\&quot;&quot;);
        sb.append(entry.getValue());
        sb.append(&quot;\&quot;&quot;);
        setupCmds.add(sb.toString());
      }
      setupCmds.add(setup);
      launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);
      tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
      if (exitCodeSet) {
        if (!killed && exitCode != 0) {
          if (exitCode == 65) {
            tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
          }
          throw new IOException(&quot;Task process exit with nonzero status of &quot; +
              exitCode + &quot;.&quot;);
        }
      }
    }
run方法为当前task new一个child JVM,为其设置文件路径,上下文环境,JVM启动参数和启动命令等信息,然后调用TaskControll方法启动新的JVM执行对应的Task工作。
  各个类关系图如下所示:
DSC0000.gif
  最后以TaskController的launchTask截至


  10.然后开始  startHealthMonitor(this.fConf);


  


  


  再来看看TaskLauncher的run方法,就是不停的循环去获取TaskTracker中新的task,然后调用startNewTask方法
   if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
localizeTask(task);
if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
this.taskStatus.setRunState(TaskStatus.State.RUNNING);
}
setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));
this.runner.start();
long now = System.currentTimeMillis();
this.taskStatus.setStartTime(now);
this.lastProgressReport = now;TaskTracker的run方法:通过维护心跳和JobTracker通信,以获取、杀掉新的Task,重点看一下heartBeat通信过程:
   synchronized (this) {
askForNewTask =
((status.countOccupiedMapSlots() < maxMapSlots ||
status.countOccupiedReduceSlots() < maxReduceSlots) &&
acceptNewTasks);
localMinSpaceStart = minSpaceStart;
}
if (askForNewTask) {
askForNewTask = enoughFreeSpace(localMinSpaceStart);
long freeDiskSpace = getFreeSpace();
long totVmem = getTotalVirtualMemoryOnTT();
long totPmem = getTotalPhysicalMemoryOnTT();
long availableVmem = getAvailableVirtualMemoryOnTT();
long availablePmem = getAvailablePhysicalMemoryOnTT();
long cumuCpuTime = getCumulativeCpuTimeOnTT();
long cpuFreq = getCpuFrequencyOnTT();
int numCpu = getNumProcessorsOnTT();
float cpuUsage = getCpuUsageOnTT();
status.getResourceStatus().setAvailableSpace(freeDiskSpace);
status.getResourceStatus().setTotalVirtualMemory(totVmem);
status.getResourceStatus().setTotalPhysicalMemory(totPmem);
status.getResourceStatus().setMapSlotMemorySizeOnTT(
mapSlotMemorySizeOnTT);
status.getResourceStatus().setReduceSlotMemorySizeOnTT(
reduceSlotSizeMemoryOnTT);
status.getResourceStatus().setAvailableVirtualMemory(availableVmem);
status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
status.getResourceStatus().setCpuFrequency(cpuFreq);
status.getResourceStatus().setNumProcessors(numCpu);
status.getResourceStatus().setCpuUsage(cpuUsage);
}
//add node health information
TaskTrackerHealthStatus healthStatus = status.getHealthStatus();
synchronized (this) {
if (healthChecker != null) {
healthChecker.setHealthStatus(healthStatus);
} else {
healthStatus.setNodeHealthy(true);
healthStatus.setLastReported(0L);
healthStatus.setHealthReport(&quot;&quot;);
}
}
//
// Xmit the heartbeat
//
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
justStarted,
justInited,
askForNewTask,
heartbeatResponseId);

该方法主要将TaskTracker上的各种性能参数信息反馈给JobTraker,调用其heartbeat方法然后解析返回的结果,下篇详细分析heartBeat机制



版权声明:本文为博主原创文章,未经博主允许不得转载。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-137887-1-1.html 上篇帖子: Weka and Hadoop 下篇帖子: eclipse develop hadoop chmod :Cannot run program "chmod": CreateProcess error=2,
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表