设为首页 收藏本站
查看: 504|回复: 0

[经验分享] Hadoop源码分析-MapReduce的Job提交过程

[复制链接]

尚未签到

发表于 2016-12-10 09:02:44 | 显示全部楼层 |阅读模式
  命令为:
  hadoop_debug jar /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar wordcount /user/admin/in/yellow.txt /user/admin/out/555
  首先调用org.apache.hadoop.util.runJar.main
   public static void main(String[] args){
     // 加载Jar /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar
    JarFile jarFile = new JarFile(fileName); 
     //根据META-INF得知主Classorg/apache/hadoop/examples/ExampleDriver
  Manifest manifest = jarFile.getManifest();
  if (manifest != null) {
      mainClassName = manifest.getMainAttributes().getValue("Main-Class");
  }
   
      //建立本地临时文件夹 /tmp/hadoop-admin
   File tmpDir = new File(new Configuration().get("hadoop.tmp.dir"));
     tmpDir.mkdirs();
   
      //建立本地工作文件夹 /tmp/hadoop-admin/hadoop-unjar4705742737164408087                final File workDir = File.createTempFile("hadoop-unjar", "", tmpDir);
    workDir.delete();
      workDir.mkdirs();
   //JVM退出时将tmp/hadoop-admin/hadoop-unjar4705742737164408087删除
Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
          try {
            FileUtil.fullyDelete(workDir);
          } catch (IOException e) {
          }
        }
        });
     //Jar包解压到/tmp/hadoop-admin/hadoop-unjar4705742737164408087   
    unJar(file, workDir);
 
   ///tmp/hadoop-admin/hadoop-unjar4705742737164408087/tmp/hadoop-admin/hadoop-unjar4705742737164408087/classes/, /tmp/hadoop-admin/hadoop-unjar4705742737164408087/lib 全部添加到classpath
    classPath.add(new File(workDir+"/").toURL());
    classPath.add(file.toURL());
    classPath.add(new File(workDir, "classes/").toURL());
    File[] libs = new File(workDir, "lib").listFiles();
    if (libs != null) {
      for (int i = 0; i libs.length; i++) {
        classPath.add(libs.toURL());
      }
    }
 
   //运行主函数
    main.invoke(null, newObject[] { newArgs });
  }   
  设置属性:
job.setJarByClass(WordCount.class);          // mapred.jar
job.setMapperClass(WordCountMap.class);      // mapreduce.map.class
job.setReducerClass(WordCountReduce.class);  // mapreduce.reduce.class
job.setCombinerClass(WordCountReduce.class); // mapreduce.combine.class
job.setMapOutputKeyClass(Text.class);        // mapred.mapoutput.key.class
job.setMapOutputValueClass(IntWritable.class); // mapred.mapoutput.value.class
job.setOutputKeyClass(Text.class);             // mapred.output.key.class
job.setOutputValueClass(IntWritable.class);    // mapred.output.value.class
job.setJobName("WordCount");                  // mapred.job.name
 
FileInputFormat.addInputPath(job, input);     // mapred.input.dir
FileOutputFormat.setOutputPath(job, output);  // mapred.output.dir
 
 
job.submit()
 
  public void submit() throws IOException, InterruptedException,
                              ClassNotFoundException {
    ......
    // Connect to the JobTracker and submit the job
    connect();
    info = jobClient.submitJobInternal(conf);
    ......
   }
 
 
连接JobTracker
 
private void connect() throws IOException, InterruptedException {
        ......
        jobClient = new JobClient((JobConf) getConfiguration());   
        ......
       
  }
 
  其中:
  public JobClient(JobConf conf) throws IOException {
    ......
    init(conf);
    }
public void init(JobConf conf) throws IOException {
     ......
     this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
    }
  private staticJobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
      Configuration conf) throws IOException {
    return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
        JobSubmissionProtocol.versionID, addr,
        UserGroupInformation.getCurrentUser(), conf,
        NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
    }
   
  此时获得一个实现JobSubmissionProtocol RPC调用,即JobTracker的代理。
   
获取job Staging Area
 
Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
              jobCopy);
  RPC请求:JobSubmissionProtocol.getStagingAreaDir()
  返回:hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging
   
  RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging)
  返回:org.apache.hadoop.hdfs.protocol.HdfsFileStatus@5521691b,即存在
   
  RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging)
  返回:org.apache.hadoop.hdfs.protocol.HdfsFileStatus@726c554,用以判断权限
   
  获得 New JobId
  JobID jobId = jobSubmitClient.getNewJobId();
   
  RPC请求:JobSubmissionProtocol.getNewJobId()
  返回:job_201404010621_0004
   
  建立 submit Job Dir
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
 
  hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004
   
  复制JarHDFS
  copyAndConfigureFiles(jobCopy, submitJobDir);
   
  RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004)
  返回:null
   
  RPC请求:ClientProtocol.mkdirs(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004, rwxr-xr-x)
  返回:true
   
  RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004, rwx------)
  返回:null
   
  RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar
  返回:null,即不存在
   
  RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar, rwxr-xr-x, DFSClient_-1317833261, true, true, 3, 67108864)
  返回:输出流
   
  RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar, DFSClient_-1317833261, null)
  返回:org.apache.hadoop.hdfs.protocol.LocatedBlock@1a9b701
  Blockblk_6689254996395759186_2720
  BlockTokenIdent: , Pass: , Kind: , Service:
  DataNode[10.1.1.103:50010, 10.1.1.102:50010]
   
  RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar, DFSClient_-1317833261
  返回:true
   
  RPC请求:ClientProtocol.setReplication(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar, 10)
  返回:true
   
  RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar, rw-r--r--)
  返回:null
   
  RPC请求:ClientProtocol.renewLease(DFSClient_-1317833261)
  返回:null
  此后有1个守护线程会不断发送 renewLease 请求
   
  此时本地文件/opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar被复制到HDFS 文件系统/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.xml
   
  Reduce数目:
   int reduces = jobCopy.getNumReduceTasks();
  reduce数目为2
   
  检查输出目录
  RPC请求:ClientProtocol.getFileInfo(/user/admin/out/555)
  返回:null,即不存在
   
  获取输入分片信息:
   int maps = writeSplits(context, submitJobDir);
  其中:
 private T extends InputSplit  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
    InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat?, ? input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
 
    ListInputSplit splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
 
    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }
     
  其中:
  public ListInputSplit getSplits(JobContext job
                                    ) throws IOException {
   ...........
    }
   
  RPC请求:ClientProtocol.getFileInfo(/user/admin/in/yellow.txt)
  返回:path="hdfs://server1:9000/user/admin/in/yellow.txt",length=201000000,isdir=false, block_replication=3, blocksize=67108864, permission=rw-r--r--,owner=Admin, group=supergroup
   
  RPC请求:ClientProtocol.getBlockLocations(/user/admin/in/yellow.txt, 0, 201000000)
  返回:3BlockLocation
  offset={0},         length={67108864}, hosts={server3,server2}, names={ [10.1.1.102:50010, 10.1.1.103:50010]}, topologyPaths={ [/default-rack/10.1.1.103:50010, /default-rack/10.1.1.102:50010]}
  offset={67108864},  length={67108864}, hosts={server3,server2}, names={ [10.1.1.102:50010, 10.1.1.103:50010]}, topologyPaths={ [/default-rack/10.1.1.103:50010, /default-rack/10.1.1.102:50010]}
  offset={134217728}, length={66782272}, hosts={server3,server2}, names={ [10.1.1.102:50010, 10.1.1.103:50010]},topologyPaths={ [/default-rack/10.1.1.103:50010, /default-rack/10.1.1.102:50010]}
    
  最终确定的分片信息 3Filespit
  Filespit file={hdfs://server1:9000/user/admin/in/yellow.txt}, hosts={ [server3, server2] }, length={ 67108864 }, start={0}
  Filespit file={hdfs://server1:9000/user/admin/in/yellow.txt}, hosts={ [server3, server2] }, length={ 67108864 }, start={67108864}
  Filespit file={hdfs://server1:9000/user/admin/in/yellow.txt}, hosts={ [server3, server2] }, length={ 66782272}, start={ 134217728}
   
  map数量为3
  jobCopy.setNumMapTasks(maps);
   
  建立分片文件:
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
          jobSubmitDir.getFileSystem(conf), array);
  RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split, rwxr-xr-x, DFSClient_-1317833261, true, true, 3, 67108864);
  返回:输出流
   
  RPC请求:ClientProtocol setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split, rw-r--r--)
  返回:null
   
  RPC请求:ClientProtocol.setReplication(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split, 10)
  返回:true
 
RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split, DFSClient_-1317833261, null)
返回:LocatedBlock 对象为
 
  Block blockid=-921399365952861077, generationStamp=2714numBytes=0
  BlockTokenIdentifierIdent: , Pass: , Kind: , Service:
  DatanodeInfo[][10.1.1.103:50010, 10.1.1.102:50010]
   offset0
   
  RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split, DFSClient_-1317833261)
  返回:true
   
  写入的 SplitMetaInfo
[data-size : 67108864 start-offset : 7 locations :  server3   server2]
[data-size : 67108864 start-offset : 116 locations :   server2  server3]
[data-size : 66782272 start-offset : 225 locations :  server2  server3 ]
   
   RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo, rwxr-xr-x, DFSClient_-1317833261, true, true, 3, 67108864)
  返回:输出流
   
  RPC请求: ClientProtocol. setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo, rw-r--r--)
  返回:null
   
  RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo, DFSClient_-1317833261, null)
返回:LocatedBlock 对象为
 
  Block blockid =789965327875207186, generationStamp= 2715numBytes=0
  BlockTokenIdentifierIdent: , Pass: , Kind: , Service:
  DatanodeInfo[][10.1.1.103:50010, 10.1.1.102:50010]
   offset0
   
  RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo, DFSClient_-1317833261)
  返回:true
  
  设置AccessControl
  RPC请求:JobSubmissionProtocol.getQueueAdmins(default)
  返回:All users are allowed
   
  Write job file to JobTracker's fs  
  RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml, rwxr-xr-x, DFSClient_-1317833261, true, true, 3, 67108864)
  返回:输出流
   
  RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml, rw-r--r--)
  返回:null
   
  RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xmlDFSClient_-1317833261, null)
返回:LocatedBlock 对象为
 
  Block blockid = -7725157033540829125, generationStamp= 2716numBytes=0
  BlockTokenIdentifierIdent: , Pass: , Kind: , Service:
  DatanodeInfo[][10.1.1.103:50010, 10.1.1.102:50010]
   offset0
   
  RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml, DFSClient_-1317833261)
  返回:true
   
  此时"/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/" 下生成文件 job.xml,包含了所有的配置信息.
  此时HDFS目录"/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/" 下面文件为:
  -rw-r--r--  10 admin  supergroup     142465 2014-04-08 00:20  job.jar
  -rw-r--r--  10 admin  supergroup        334 2014-04-08 00:45     job.split
  -rw-r--r--   3 admin  supergroup         80 2014-04-08 00:50       job.splitmetainfo
  -rw-r--r--   3 admin supergroup  20416 2014-04-08 00:55 job.xml
  job.jar 为运行的Jar,   job.split内容 为(FileSplit 对象), job.splitmetainfo 内容 为(SplitMetaInfo 对象),job.xml job的配置文件
   
  提交作业:
status = jobSubmitClient.submitJob(
                jobId, submitJobDir.toString(), jobCopy.getCredentials());
   
  RPC请求:JobSubmissionProtocol.submitJob(job_201404010621_0004, hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004, org.apache.hadoop.security.Credentials@70677770)
  返回: JobStatus setProgress=0mapProgress=0reduceProgress=0cleanProgress=0runstate=4priority=NOMAL..
   
  RPC请求:JobSubmissionProtocol.getJobProfile(job_201404010621_0004
  返回:JobProfilejobFile=hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xmljobID= job_201404010621_0004name= WordCountqueue= defaulturl= http://server1:50030/jobdetails.jsp?jobid=job_201404010621_0004user= Admin
   
  综合JobStatus JobProfile
Job: job_201404010621_0004
file: hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.xml
tracking URL: http://server1:50030/jobdetails.jsp?jobid=job_201404010621_0004
map() completion: 0.0
reduce() completion: 0.0
   
  监控Job状态:
   jobClient.monitorAndPrintJob(conf, info);
   
  RPC请求:JobSubmissionProtocol.getJobStatus(job_201404010621_0004)
  返回:   setProgress=1mapProgress=1reduceProgress=0.22222224cleanProgress=1runstate=1priority=NOMAL
   
  RPC请求:JobSubmissionProtocol.getJobStatus(job_201404010621_0004)
  返回:   setProgress=1mapProgress=1reduceProgress=1cleanProgress=1runstate=2priority=NOMAL
   
  map 100% reduce 100%
  之后会多次发送JobSubmissionProtocol.getJobStatus(job_201404010621_0004)请求
   
  RPC请求:JobSubmissionProtocol.getTaskCompletionEvents(job_201404010621_0004, 0, 10)
返回: [Task Id : attempt_201404010621_0004_m_000004_0, Status : SUCCEEDED, Task Id : attempt_201404010621_0004_m_000002_0, Status : SUCCEEDED, Task Id : attempt_201404010621_0004_m_000000_0, Status : SUCCEEDED, Task Id : attempt_201404010621_0004_m_000001_0, Status : SUCCEEDED, Task Id : attempt_201404010621_0004_m_000000_1, Status : KILLED, Task Id : attempt_201404010621_0004_r_000000_0, Status : SUCCEEDED, Task Id : attempt_201404010621_0004_r_000001_0, Status : SUCCEEDED, Task Id : attempt_201404010621_0004_m_000003_0, Status : SUCCEEDED]
 
 
RPC请求:JobSubmissionProtocol.getJobCounters(job_201404010621_0004)
返回:OW[class=class org.apache.hadoop.mapred.Counters,value=Counters: 29
       Job Counters
              Launched reduce tasks=2
              SLOTS_MILLIS_MAPS=293879
              Total time spent by all reduces waiting after reserving slots (ms)=0
              Total time spent by all maps waiting after reserving slots (ms)=0
              Launched map tasks=4
              Data-local map tasks=4
              SLOTS_MILLIS_REDUCES=74342
       File Output Format Counters
              Bytes Written=933
       FileSystemCounters
              FILE_BYTES_READ=316152
              HDFS_BYTES_READ=201008521
              FILE_BYTES_WRITTEN=370366
              HDFS_BYTES_WRITTEN=933
       File Input Format Counters
              Bytes Read=201008194
       Map-Reduce Framework
              Map output materialized bytes=2574
              Map input records=15600000
              Reduce shuffle bytes=2574
              Spilled Records=23025
              Map output bytes=356000000
              Total committed heap usage (bytes)=378023936
              CPU time spent (ms)=158350
              Combine input records=41011850
              SPLIT_RAW_BYTES=327
              Reduce input records=225
              Reduce input groups=75
              Combine output records=12075
              Physical memory (bytes) snapshot=650371072
              Reduce output records=75
              Virtual memory (bytes) snapshot=5300277248
              Map output records=41000000]

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312156-1-1.html 上篇帖子: Hadoop之MapReduce的两种任务模式 下篇帖子: hadoop-0.21.0-eclipse-plugin无法在eclipse中运行解决方案 转
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表