设为首页 收藏本站
查看: 684|回复: 0

[经验分享] Hadoop之TaskTraker分析(转)

[复制链接]

尚未签到

发表于 2016-12-10 06:20:11 | 显示全部楼层 |阅读模式
TaskTracker的工作职责之前已经和大家提过,主要负责维护,申请和监控Task,通过heartbeat和JobTracker进行通信。
     TaskTracker的init过程:
     1.读取配置文件,解析参数
     2.将TaskTraker上原有的用户local files删除并新建新的dir和file
     3. Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>(); 清除map
     4.    this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();记录task的链表
            this.runningJobs = new TreeMap<JobID, RunningJob>();记录job的id信息

     5.初始化JVMManager:
[java] view plaincopyprint? 



  • mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(),   
  •       true, tracker);  
  •   reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),  
  •       false, tracker);  

       6.初始化RPC,获取JobTracker client用于heartbeat通信;
 
     7.new一个 后台线程用于监听map完成的事件
[java] view plaincopyprint? 



  • this.mapEventsFetcher = new MapEventsFetcherThread();  
  • mapEventsFetcher.setDaemon(true);  
  • mapEventsFetcher.setName(  
  •                          "Map-events fetcher for all reduce tasks " + "on " +   
  •                          taskTrackerName);  
  • mapEventsFetcher.start();  

      后台线程的run方法如下:
 
 
[java] view plaincopyprint? 



  • while (running) {  
  •        try {  
  •          List <FetchStatus> fList = null;  
  •          synchronized (runningJobs) {  
  •            while (((fList = reducesInShuffle()).size()) == 0) {  
  •              try {  
  •                runningJobs.wait();  
  •              } catch (InterruptedException e) {  
  •                LOG.info("Shutting down: " + this.getName());  
  •                return;  
  •              }  
  •            }  
  •          }  
  •          // now fetch all the map task events for all the reduce tasks  
  •          // possibly belonging to different jobs  
  •          boolean fetchAgain = false//flag signifying whether we want to fetch  
  •                                      //immediately again.  
  •          for (FetchStatus f : fList) {  
  •            long currentTime = System.currentTimeMillis();  
  •            try {  
  •              //the method below will return true when we have not   
  •              //fetched all available events yet  
  •              if (f.fetchMapCompletionEvents(currentTime)) {  
  •                fetchAgain = true;  
  •              }  
  •            } catch (Exception e) {  
  •              LOG.warn(  
  •                       "Ignoring exception that fetch for map completion" +  
  •                       " events threw for " + f.jobId + " threw: " +  
  •                       StringUtils.stringifyException(e));   
  •            }  
  •            if (!running) {  
  •              break;  
  •            }  
  •          }  
  •          synchronized (waitingOn) {  
  •            try {  
  •              if (!fetchAgain) {  
  •                waitingOn.wait(heartbeatInterval);  
  •              }  
  •            } catch (InterruptedException ie) {  
  •              LOG.info("Shutting down: " + this.getName());  
  •              return;  
  •            }  
  •          }  
  •        } catch (Exception e) {  
  •          LOG.info("Ignoring exception "  + e.getMessage());  
  •        }  
  •      }  
  •    }   

  8.initializeMemoryManagement,初始化每个TrackTask的内存设置
 
9.new一个Map和Reducer的Launcher后台线程
 
[java] view plaincopyprint? 



  • mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);  
  •  reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);  
  •  mapLauncher.start();  
  •  reduceLauncher.start();  

  用于后面创建子JVM来执行map、reduce task
 
看一下
[java] view plaincopyprint? 



  • TaskLauncher的run方法:  
  •  //before preparing the job localize   
  •       //all the archives  
  •       TaskAttemptID taskid = t.getTaskID();  
  •       final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");  
  •       //simply get the location of the workDir and pass it to the child. The  
  •       //child will do the actual dir creation  
  •       final File workDir =  
  •       new File(new Path(localdirs[rand.nextInt(localdirs.length)],   
  •           TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(),   
  •           taskid.toString(),  
  •           t.isTaskCleanupTask())).toString());  
  •         
  •       String user = tip.getUGI().getUserName();  
  •         
  •       // Set up the child task's configuration. After this call, no localization  
  •       // of files should happen in the TaskTracker's process space. Any changes to  
  •       // the conf object after this will NOT be reflected to the child.  
  •       // setupChildTaskConfiguration(lDirAlloc);  
  •   
  •       if (!prepare()) {  
  •         return;  
  •       }  
  •         
  •       // Accumulates class paths for child.  
  •       List<String> classPaths = getClassPaths(conf, workDir,  
  •                                               taskDistributedCacheManager);  
  •   
  •       long logSize = TaskLog.getTaskLogLength(conf);  
  •         
  •       //  Build exec child JVM args.  
  •       Vector<String> vargs = getVMArgs(taskid, workDir, classPaths, logSize);  
  •         
  •       tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);  
  •   
  •       // set memory limit using ulimit if feasible and necessary ...  
  •       String setup = getVMSetupCmd();  
  •       // Set up the redirection of the task's stdout and stderr streams  
  •       File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());  
  •       File stdout = logFiles[0];  
  •       File stderr = logFiles[1];  
  •       tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,  
  •                  stderr);  
  •         
  •       Map<String, String> env = new HashMap<String, String>();  
  •       errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid,  
  •                                    logSize);  
  •         
  •       // flatten the env as a set of export commands  
  •       List <String> setupCmds = new ArrayList<String>();  
  •       for(Entry<String, String> entry : env.entrySet()) {  
  •         StringBuffer sb = new StringBuffer();  
  •         sb.append("export ");  
  •         sb.append(entry.getKey());  
  •         sb.append("=\"");  
  •         sb.append(entry.getValue());  
  •         sb.append("\"");  
  •         setupCmds.add(sb.toString());  
  •       }  
  •       setupCmds.add(setup);  
  •         
  •       launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);  
  •       tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());  
  •       if (exitCodeSet) {  
  •         if (!killed && exitCode != 0) {  
  •           if (exitCode == 65) {  
  •             tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());  
  •           }  
  •           throw new IOException("Task process exit with nonzero status of " +  
  •               exitCode + ".");  
  •         }  
  •       }  
  •     }  

  run方法为当前task new一个child JVM,为其设置文件路径,上下文环境,JVM启动参数和启动命令等信息,然后调用TaskControll方法启动新的JVM执行对应的Task工作。
 
各个类关系图如下所示:
DSC0000.gif
最后以TaskController的launchTask截至
10.然后开始  startHealthMonitor(this.fConf);
 
 
再来看看TaskLauncher的run方法,就是不停的循环去获取TaskTracker中新的task,然后调用startNewTask方法
 
[java] view plaincopyprint? 



  • if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||  
  •          this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||  
  •          this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {  
  •        localizeTask(task);  
  •        if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {  
  •          this.taskStatus.setRunState(TaskStatus.State.RUNNING);  
  •        }  
  •        setTaskRunner(task.createRunner(TaskTracker.thisthis, rjob));  
  •        this.runner.start();  
  •        long now = System.currentTimeMillis();  
  •        this.taskStatus.setStartTime(now);  
  •        this.lastProgressReport = now;  

  TaskTracker的run方法:通过维护心跳和JobTracker通信,以获取、杀掉新的Task,重点看一下heartBeat通信过程:
 
 
[java] view plaincopyprint? 



  • synchronized (this) {  
  •      askForNewTask =   
  •        ((status.countOccupiedMapSlots() < maxMapSlots ||   
  •          status.countOccupiedReduceSlots() < maxReduceSlots) &&   
  •         acceptNewTasks);   
  •      localMinSpaceStart = minSpaceStart;  
  •    }  
  •    if (askForNewTask) {  
  •      askForNewTask = enoughFreeSpace(localMinSpaceStart);  
  •      long freeDiskSpace = getFreeSpace();  
  •      long totVmem = getTotalVirtualMemoryOnTT();  
  •      long totPmem = getTotalPhysicalMemoryOnTT();  
  •      long availableVmem = getAvailableVirtualMemoryOnTT();  
  •      long availablePmem = getAvailablePhysicalMemoryOnTT();  
  •      long cumuCpuTime = getCumulativeCpuTimeOnTT();  
  •      long cpuFreq = getCpuFrequencyOnTT();  
  •      int numCpu = getNumProcessorsOnTT();  
  •      float cpuUsage = getCpuUsageOnTT();  
  •   
  •      status.getResourceStatus().setAvailableSpace(freeDiskSpace);  
  •      status.getResourceStatus().setTotalVirtualMemory(totVmem);  
  •      status.getResourceStatus().setTotalPhysicalMemory(totPmem);  
  •      status.getResourceStatus().setMapSlotMemorySizeOnTT(  
  •          mapSlotMemorySizeOnTT);  
  •      status.getResourceStatus().setReduceSlotMemorySizeOnTT(  
  •          reduceSlotSizeMemoryOnTT);  
  •      status.getResourceStatus().setAvailableVirtualMemory(availableVmem);   
  •      status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);  
  •      status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);  
  •      status.getResourceStatus().setCpuFrequency(cpuFreq);  
  •      status.getResourceStatus().setNumProcessors(numCpu);  
  •      status.getResourceStatus().setCpuUsage(cpuUsage);  
  •    }  
  •    //add node health information  
  •      
  •    TaskTrackerHealthStatus healthStatus = status.getHealthStatus();  
  •    synchronized (this) {  
  •      if (healthChecker != null) {  
  •        healthChecker.setHealthStatus(healthStatus);  
  •      } else {  
  •        healthStatus.setNodeHealthy(true);  
  •        healthStatus.setLastReported(0L);  
  •        healthStatus.setHealthReport("");  
  •      }  
  •    }  
  •    //  
  •    // Xmit the heartbeat  
  •    //  
  •    HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,   
  •                                                              justStarted,  
  •                                                              justInited,  
  •                                                              askForNewTask,   
  •                                                              heartbeatResponseId);  

  
该方法主要将TaskTracker上的各种性能参数信息反馈给JobTraker,调用其heartbeat方法然后解析返回的结果,下篇详细分析heartBeat机制

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-311969-1-1.html 上篇帖子: hadoop实现demo 下篇帖子: hadoop如何封装shell脚本
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表