设为首页 收藏本站
查看: 1138|回复: 0

[经验分享] (转)【Hadoop代码笔记】Hadoop作业提交之客户端作业提交

[复制链接]

尚未签到

发表于 2016-12-10 10:46:19 | 显示全部楼层 |阅读模式
  1.      概要描述
仅仅描述向Hadoop提交作业的第一步,即调用Jobclient的submitJob方法,向Hadoop提交作业。
  2.      详细描述
Jobclient使用内置的JobSubmissionProtocol 实例jobSubmitClient 和JobTracker交互,最主要是提交作业、获取作业执行信息等。
  在JobClient中作业提交的主要过程如下:
  1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID
2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。
3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile
4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormat的checkOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException
5)计算作业的输入分片。通过InputFormat的getSplits(job)方法获得作业的split并将split序列化封装为RawSplit。返回split数目,也即代表有多个分片有多少个map。详细参见InputFormat获取Split的方法。
6)writeNewSplits 方法把输入分片写到JobTracker的job目录下。
7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。
8) 使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。 JobTracker作业放入到内存队列中,由作业调度器进行调度。并初始化作业实例。JobTracker创建job成功后会给JobClient传回 一个JobStatus对象 用于记录job的状态信息,如执行时间、Map和Reduce任务完成的比例等。JobClient会根据这个 JobStatus对象创建一个 NetworkedJob的RunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。
DSC0000.gif

  引用下Hadoop: The Definitive Guide, Second Edition中的一张经典图。这里仅仅描述上图中的左上角第一个框部分内容,即本步骤的最终输出仅仅是将作业提交到JobTracker。其他后续文章会继续描述。
  3.      涉及主要类介绍:
  Jobclient :JobClient是向JobTracker提交作业的接口,可以理解为Hadoop的Mapreduce作业框架向用户开放的作业提交入口。可以提交作业,监视作业状态等
  JobSubmissionProtocol(为什么0.20.1的javadoc中找不到这个接口,虽然0.20.1 0.20.2代码中都是相同的用法,知道2.2.0貌似重命名为被ClientProtocol替换):JobClient和JobTracker进行通 信的一个协议。JobClient实际上是用这个句柄来提交锁业并且监视作业的执行状况。
  这个接口有两个实现:LocalJobRunner(conf)当mapred-site.xml中的mapred.job.tracker值为local是为此对象。表示在单机上执行;如果为一个地址的话则是 JobTracker的对象,表示分布式执 行。
  详细可参照JobClient中 的初始化代码:

DSC0001.gif


  /**
*如果是非local的就会 连接到指定的JobTracker  
*/
public void init(JobConf conf) throws IOException {
String tracker
= conf.get("mapred.job.tracker", "local");
if ("local".equals(tracker)) {
this.jobSubmitClient = new LocalJobRunner(conf);
}
else {
this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
}        
}
/*
* RPC不是本次主题重点,可参照后续发表的专题内容
*/
private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
Configuration conf)
throws IOException {
return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,
NetUtils.getSocketFactory(conf, JobSubmissionProtocol.
class));
}






  InputFormat 重要,但暂不展开(此处会有链接)
  Split  重要,但暂不展开(此处会有链接)
  RowSplit 重要,但暂不展开(此处会有链接)
  4.     主要代码
通过代码来了解流程,了解如何调用JobClient向Hadoop集群提交作业。




  public RunningJob submitJob(JobConf job) throws FileNotFoundException,
IOException {
try {
return submitJobInternal(job);
}
catch (InterruptedException ie) {
throw new IOException("interrupted", ie);
}
catch (ClassNotFoundException cnfe) {
throw new IOException("class not found", cnfe);
}
}






  实际方法的执行是submitJobInternal方法。着重看下这个方法的内部执行。主要的逻辑部分比较详细的进行了注释。(有些想继续展开,感觉太细了,后面的文章中部分重要的会有涉及,不想深度遍历了,到时会回过头来互相链接) 



1 public RunningJob submitJobInternal(JobConf job)
2             throws FileNotFoundException, ClassNotFoundException,
3             InterruptedException, IOException {
4
5         // 1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID
6         JobID jobId = jobSubmitClient.getNewJobId();
7         // 2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。
8         // 3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile
9
10         Path submitJobDir = new Path(getSystemDir(), jobId.toString());
11         Path submitJarFile = new Path(submitJobDir, "job.jar");
12         Path submitSplitFile = new Path(submitJobDir, "job.split");
13         configureCommandLineOptions(job, submitJobDir, submitJarFile);
14         Path submitJobFile = new Path(submitJobDir, "job.xml");
15         int reduces = job.getNumReduceTasks();
16         JobContext context = new JobContext(job, jobId);
17
18         // Check the output specification
19         // 4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormat的checkOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException
20
21         if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
22             org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
23                     .newInstance(context.getOutputFormatClass(), job);
24             output.checkOutputSpecs(context);
25         } else {
26             job.getOutputFormat().checkOutputSpecs(fs, job);
27         }
28
29         // 5)计算作业的输入分片。详细参见FormatInputFormat获取Split的方法。
30         // 6)writeNewSplits 方法把输入分片写到JobTracker的job目录下,名称是submitSplitFile
31         // job.split名称。
32         // 7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。
33
34         // Create the splits for the job
35         LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
36         int maps;
37         if (job.getUseNewMapper()) {
38             maps = writeNewSplits(context, submitSplitFile);
39         } else {
40             maps = writeOldSplits(job, submitSplitFile);
41         }
42         job.set("mapred.job.split.file", submitSplitFile.toString());
43         job.setNumMapTasks(maps);
44
45         // Write job file to JobTracker's fs
46         FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
47                 new FsPermission(JOB_FILE_PERMISSION));
48
49         try {
50             job.writeXml(out);
51         } finally {
52             out.close();
53         }
54
55         // 8)使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。JobTracker根据接收到的submitJob()方法调用后,把调用放入到内存队列中,由作业调度器进行调度。并初始化作业实例。
56
57         JobStatus status = jobSubmitClient.submitJob(jobId);
58         if (status != null) {
59             return new NetworkedJob(status);
60         } else {
61             throw new IOException("Could not launch job");
62         }
63     }







/**
* JobTracker.submitJob() kicks off a new job.  
*
* Create a 'JobInProgress' object, which contains both JobProfile
* and JobStatus.  Those two sub-objects are sometimes shipped outside
* of the JobTracker.  But JobInProgress adds info that's useful for
* the JobTracker alone.
*/
public synchronized JobStatus submitJob(JobID jobId) throws IOException {
if(jobs.containsKey(jobId)) {
//job already running, don't start twice
return jobs.get(jobId).getStatus();
}
JobInProgress job
= new JobInProgress(jobId, this, this.conf);
String queue
= job.getProfile().getQueueName();
if(!(queueManager.getQueues().contains(queue))) {      
new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
throw new IOException("Queue \"" + queue + "\" does not exist");        
}
// check for access
try {
checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
}
catch (IOException ioe) {
LOG.warn(
"Access denied for user " + job.getJobConf().getUser()
+ ". Ignoring job " + jobId, ioe);
new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
throw ioe;
}
return addJob(jobId, job);
}






  为了转载内容的一致性、可追溯性和保证及时更新纠错,转载时请注明来自:http://www.cnblogs.com/douba/p/hadoop_jobclient_submit.html。谢谢!

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312297-1-1.html 上篇帖子: (转)hadoop yarn 内存相关配置 下篇帖子: 用hadoop估算圆周率PI(3.1415926)的值
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表