设为首页 收藏本站
查看: 955|回复: 0

[经验分享] Hadoop Map/Reduce执行全流程关键代码

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2014-6-6 12:07:47 | 显示全部楼层 |阅读模式

    Hadoop Map/Reduce 执行流程关键代码  
      
    JobClient.runJob(conf) | 运行job  
    |-->JobClient jc = new JobClient(job);  
    |-->RunningJob rj = jc.submitJob(job);  
        |-->submitJobInternal(job);  
            |-->int reduces = job.getNumReduceTasks();  
            |-->JobContext context = new JobContext(job, jobId);  
            |-->maps = writeOldSplits(job, submitSplitFile);  
            |-->job.setNumMapTasks(maps);  
            |-->job.writeXml(out);  
            |-->JobStatus status = jobSubmitClient.submitJob(jobId);
    ----------------------------------------------------------------------------------------------

      
    JobTracker.submitJob(JobId) |提交job  
    |-->JobInProgress job = new JobInProgress(jobId, this, this.conf);  
    |-->checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);  |检查权限  
    |-->checkMemoryRequirements(job);  |检查内存需求  
    |-->addJob(jobId, job);  |添加至job队列  
        |-->jobs.put(job.getProfile().getJobID(), job);  
        |--> for (JobInProgressListener listener : jobInProgressListeners) |添加至监听器,供调度使用  
            |-->listener.jobAdded(job);  
     
    ------------------------------------------------------------------------------------------------------
    JobTracker.heartbeat()  |JobTracker启动后供TaskTracker以RPC方式来调用,返回Response集合  
    |-->List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();  
    |-->tasks = taskScheduler.assignTasks(taskTrackerStatus);  |通过调度器选择合适的tasks  
    |-->for (Task task : tasks)  
        |-->expireLaunchingTasks.addNewTask(task.getTaskID());  
        |-->actions.add(new LaunchTaskAction(task));  |实际actions还会添加commmitTask等  
    |-->response.setHeartbeatInterval(nextInterval);  
    |-->response.setActions(actions.toArray(new TaskTrackerAction[actions.size()]));  
    |-->return response;  
      
     
    ---------------------------------------------------------------------------------------------------------
     
    TaskTracker.offerService |TaskTracker启动后通过offerservice()不断发心跳至JobTracker中  
    |-->transmitHeartBeat()  
        |-->HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, justInited,askForNewTask, heartbeatResponseId);  
    |-->TaskTrackerAction[] actions = heartbeatResponse.getActions();  
    |-->for(TaskTrackerAction action: actions)  
        |-->if (action instanceof LaunchTaskAction)  
            |-->addToTaskQueue((LaunchTaskAction)action);  |添加至执行Queue,根据map/reduce task分别添加  
                |-->if (action.getTask().isMapTask()) {  
                    |-->mapLauncher.addToTaskQueue(action);  
                        |-->TaskInProgress tip = registerTask(action, this);  
                        |-->tasksToLaunch.add(tip);  
                        |-->tasksToLaunch.notifyAll();  |唤醒阻塞进程  
                |-->else   
                    |-->reduceLauncher.addToTaskQueue(action);  
     
    --------------------------------------------------------------------------------------------------------
     
    TaskLauncher.run()  
    |--> while (tasksToLaunch.isEmpty())   
                 |-->tasksToLaunch.wait();  
    |-->tip = tasksToLaunch.remove(0);  
    |-->startNewTask(tip);  
        |-->localizeJob(tip);  
            |-->launchTaskForJob(tip, new JobConf(rjob.jobConf));   
                |-->tip.setJobConf(jobConf);  
                |-->tip.launchTask();  |TaskInProgress.launchTask()  
                    |-->this.runner = task.createRunner(TaskTracker.this, this); |区分map/reduce  
                    |-->this.runner.start();
    ----------------------------------------------------------------------------------------
    MapTaskRunner.run()  |执行MapTask  
    |-->File workDir = new File(lDirAlloc.getLocalPathToRead()  |准备执行路径  
    |-->String jar = conf.getJar();  |准备jar包  
    |-->File jvm = new File(new File(System.getProperty("java.home"), "bin"), "java");  |获取jvm  
    |-->vargs.add(Child.class.getName());  |添加参数,Child类作为main主函数启动  
    |-->tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf, pidFile);  |添加至内存管理  
    |-->jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,  |统一纳入jvm管理器当中并启动  
                    workDir, env, pidFile, conf));  
            |-->mapJvmManager.reapJvm(t, env);  |区分map/reduce操作  
     
    ---------------------------------------------------------------------------------------------------------------
    JvmManager.reapJvm()  |  
    |--> while (jvmIter.hasNext())  
        |-->JvmRunner jvmRunner = jvmIter.next().getValue();  
        |-->JobID jId = jvmRunner.jvmId.getJobId();  
        |-->setRunningTaskForJvm(jvmRunner.jvmId, t);  
    |-->spawnNewJvm(jobId, env, t);  
        |-->JvmRunner jvmRunner = new JvmRunner(env,jobId);  
            |-->jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);  
        |-->jvmRunner.start();   |执行JvmRunner的run()方法  
            |-->jvmRunner.run()  
                |-->runChild(env);  
                    |-->List<String> wrappedCommand =  TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,  
                             env.logSize, env.pidFile);  |选取main函数  
                    |-->shexec.execute();  |执行  
                    |-->int exitCode = shexec.getExitCode(); |获取执行状态值  
                    |--> updateOnJvmExit(jvmId, exitCode, killed); |更新Jvm状态  
     
    ---------------------------------------------------------------------------------------------------------
    Child.main() 执行Task(map/reduce)  
    |-->JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);  
    |-->TaskUmbilicalProtocol umbilical = (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,  
            TaskUmbilicalProtocol.versionID, address, defaultConf);  
    |--> while (true)   
        |-->JvmTask myTask = umbilical.getTask(jvmId);  
        |-->task = myTask.getTask();  
        |-->taskid = task.getTaskID();  
        |-->TaskRunner.setupWorkDir(job);  
        |-->task.run(job, umbilical);   |以maptask为例  
            |-->TaskReporter reporter = new TaskReporter(getProgress(), umbilical);  
            |-->if (useNewApi)  
                |-->runNewMapper(job, split, umbilical, reporter);  
            |-->else  
                |-->runOldMapper(job, split, umbilical, reporter);  
                    |-->inputSplit = (InputSplit) ReflectionUtils.newInstance(job.getClassByName(splitClass), job);  
                    |-->MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =  ReflectionUtils.newInstance(job.getMapRunnerClass(), job);  
                    |-->runner.run(in, new OldOutputCollector(collector, conf), reporter);  
     ------------------------------------------------------------------------------------------------------------------------
    MapRunner.run()  
    |--> K1 key = input.createKey();  
    |-->V1 value = input.createValue();  
    |-->while (input.next(key, value))   
        |-->mapper.map(key, value, output, reporter);  
        |--> if(incrProcCount)   
            |-->reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,   
                    |-->SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);  
    |-->mapper.close();  


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-20285-1-1.html 上篇帖子: Namenode源代码分析 下篇帖子: Hadoop中DataNode与NameNode之间的心跳机制
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表