设为首页 收藏本站
查看: 946|回复: 0

[经验分享] [hadoop源码阅读][9]-mapreduce-job提交过程

[复制链接]

尚未签到

发表于 2015-7-11 10:27:45 | 显示全部楼层 |阅读模式
  1.从wordcount作为入口



public class WordCount
{
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "word count");//job类的主要工作就是设置各种参数
job.setJarByClass(WordCount.class);//将包含WordCount.class的jar找到,后面要上传的
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);//如果传入参数为ture则及时打印作业运作信息,否则只是等待作业结束
}
}


  
  2.job.waitForCompletion(true)函数内主要干活的是submit




public void submit() throws IOException, InterruptedException, ClassNotFoundException
{
ensureState(JobState.DEFINE);

//Job.setMapperClass(xxx.class):实际上设置的是mapreduce.map.class,即New。JobConf.setMapperClass(xxx.class):实际上设置的是mapred.mapper.class,即Old。可见不调用JobConf.setMapperClass,就应该是使用的NewAPI。


    setUseNewAPI();
info = jobClient.submitJobInternal(conf);//实际的job提交过程.info用来和jobtracker进行交互,对提交的job进行监控以及杀死等操作

state = JobState.RUNNING;
}
  
  3.在看jobClient.submitJobInternal(conf)函数之前,先看jobclient这个对象的构造过程:
  3.1先将mapred-site.xml和core-site.xml包含到conf中.static代码
  3.2 init函数




public void init(JobConf conf) throws IOException
{
String tracker = conf.get("mapred.job.tracker", "local");//如果没有设置,或者没有找到上面2个xml配置文件
if ("local".equals(tracker))
{
conf.setNumMapTasks(1);//local模式 reduce只能是1
this.jobSubmitClient = new LocalJobRunner(conf);
}
else
{//创建rpc
this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
}
}
//jobSubmitClient对应的类是JobSubmissionProtocol的实现之一(目前有两个实现,JobTracker和LocalJobRunner)
  3.3 submitJobInternal函数




public RunningJob submitJobInternal(JobConf job)
{
JobID jobId = jobSubmitClient.getNewJobId();
Path submitJobDir = new Path(getSystemDir(), jobId.toString());//conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system")
Path submitJarFile = new Path(submitJobDir, "job.jar");
Path submitSplitFile = new Path(submitJobDir, "job.split");
/*1.建立submitJobDir目录,2.将参数中指定的jars,files,archives放到分布式缓存中,
* 3.将main函数所在的jar包上传为submitJarFile
* 4.设置user,group,这个对hdfs的文件操作是有权限影响的,设置当前工作目录
* */
configureCommandLineOptions(job, submitJobDir, submitJarFile);
Path submitJobFile = new Path(submitJobDir, "job.xml");
int reduces = job.getNumReduceTasks();
JobContext context = new JobContext(job, jobId);
// 检测输出目录是否存在,如果存在是会报错的
org.apache.hadoop.mapreduce.OutputFormat >List getSplits(JobContext job)
  3.1---->>List listStatus(JobContext job)//过滤掉输入路径下,以_和.开头的路径,以及根据用户设置的mapred.input.pathFilter.class对文件进行过滤,得到文件列表
  3.2 如果文件是压缩的,也即是不可splitable的,那么整个文件作为一个split
  3.3 如果文件是splitable的,那么首先计算每个split的大小,mapred.min.split.size的,默认大小是1,
  mapred.max.split.size的默认值是Long.MAX_VALUE,blockSize的默认大小是64M,
  那么split的大小为Math.max(minSize, Math.min(maxSize, blockSize));从公式可以看出,
  如果maxSize设置大于blockSize,那么每个block就是一个分片,否则就会将一个block文件分隔为多个分片,
  如果block中剩下的一小段数据量小于splitSize,还是认为它是独立的分片。
  3.4 将每个split的路径,大小,下标,以及位置信息保存到split数组
  2.2 安装每个split的大小进行排序,将大的放在前面,然后序列化到文件中.
  
  
  参考文献
  http://langyu.iteye.com/blog/909170
  http://blog.iyunv.com/andyelvis/article/details/7706205
  http://www.iyunv.com/forfuture1978/archive/2010/11/19/1882268.html
  http://www.iyunv.com/spork/archive/2010/04/21/1717552.html
  http://baoshengdeer.sinaapp.com/?p=116

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85454-1-1.html 上篇帖子: 和远哥一起了解Hadoop的MapReduce是如何运行的 下篇帖子: [hadoop源码阅读][9]-mapreduce-从wordcount开始
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表