设为首页 收藏本站
查看: 1102|回复: 0

hadoop中mapreduce的常用类(一)

[复制链接]
累计签到:172 天
连续签到:1 天
发表于 2015-5-27 10:17:08 | 显示全部楼层 |阅读模式
hadoopmapreduce的常用类(一)
云智慧(北京)科技有限公司陈鑫
写这个文章的时候才意识到新旧API是同时存在于1.1.2的hadoop中的。以前还一直纳闷儿为什么有时候是jobClient提交任务,有时是Job...不管API是否更新,下面这些类也还是存在于API中的,经过自己跟踪源码,发现原理还是这些。只不过进行了重新组织,进行了一些封装,使得扩展性更好。所以还是把这些东西从记事本贴进来吧。
关于这些类的介绍以及使用,有的是在自己debug中看到的,多数为纯翻译API的注释,但是翻译的过程受益良多。

GenericOptionsParser
parseGeneralOptions(Options opts, Configuration conf, String[] args)解析命令行参数
      GenericOptionsParser是为hadoop框架解析命令行参数的工具类。它能够辨认标准的命令行参数,使app能够轻松指定namenode,jobtracker,以及额外的配置资源或信息等。它支持的功能有:
      -conf 指定配置文件;
      -D 指定配置信息;
      -fs      指定namenode
      -jt     指定jobtracker
      -files  指定需要copy到MR集群的文件,以逗号分隔
      -libjars指定需要copy到MR集群的classpath的jar包,以逗号分隔
      -archives指定需要copy到MR集群的压缩文件,以逗号分隔,会自动解压缩
  • String[] otherArgs = new GenericOptionsParser(job, args)  
  •      .getRemainingArgs();  
  • if (otherArgs.length != 2) {  
  •    System.err.println("Usage: wordcount <in> <out>");  
  •    System.exit(2);  
  • }  


ToolRunner
用来跑实现Tool接口的工具。它与GenericOptionsParser合作来解析命令行参数,只在此次运行中更改configuration的参数。
Tool
处理命令行参数的接口。Tool是MR的任何tool/app的标准。这些实现应该代理对标准命令行参数的处理。下面是典型实现:

  • public class MyApp extends Configured implements Tool {  
  •       
  •    public int run(String[] args) throws Exception {  
  •      // 即将被ToolRunner执行的Configuration  
  •      Configuration conf = getConf();  
  •       
  •      // 使用conf建立JobConf  
  •      JobConf job = new JobConf(conf, MyApp.class);  
  •       
  •      // 执行客户端参数  
  •      Path in = new Path(args[1]);  
  •      Path out = new Path(args[2]);  
  •       
  •      // 指定job相关的参数       
  •      job.setJobName("my-app");  
  •      job.setInputPath(in);  
  •      job.setOutputPath(out);  
  •      job.setMapperClass(MyApp.MyMapper.class);  
  •      job.setReducerClass(MyApp.MyReducer.class);  
  • *  
  •      // 提交job,然后监视进度直到job完成  
  •      JobClient.runJob(job);  
  •    }  
  •      
  •    public static void main(String[] args) throws Exception {  
  •      // ToolRunner 处理命令行参数   
  •      int res = ToolRunner.run(new Configuration(), new Sort(), args);  //这里封装了GenericOptionsParser解析args  
  •       
  •      System.exit(res);  
  •    }  
  • }  

MultipleOutputFormat
自定义输出文件名称或者说名称格式。在jobconf中setOutputFormat(MultipleOutputFormat的子类)就行了。而不是那种part-r-00000啥的了。。。并且可以分配结果到多个文件中。
       MultipleOutputFormat继承了FileOutputFormat, 允许将输出数据写进不同的输出文件中。有三种应用场景:
a. 最少有一个reducer的mapreduce任务。这个reducer想要根据实际的key将输出写进不同的文件中。假设一个key编码了实际的key和为实际的key指定的位置
b. 只有map的任务。这个任务想要把输入文件或者输入内容的部分名称设为输出文件名。
c. 只有map的任务。这个任务为输出命名时,需要依赖keys和输入文件名。
  • //这里是根据key生成多个文件的地方,可以看到还有valuename等参数  
  • @Override  
  • protected String generateFileNameForKeyValue(Text key,  
  •      IntWritable value, String name) {  
  •    char c = key.toString().toLowerCase().charAt(0);  
  •    if (c >= 'a' && c <= 'z') {  
  •      return c + ".txt";  
  •    }  
  •    return "result.txt";  
  • }  

DistributedCache
在集群中快速分发大的只读文件。DistributedCache是MR用来缓存app需要的诸如text,archive,jar等的文件的。app通过jobconf中的url来指定需要缓存的文件。它会假定指定的这个文件已经在url指定的对应位置上了。在job在node上执行之前,DistributedCache会copy必要的文件到这个slave node。它的功效就是为每个job只copy一次,而且copy到指定位置,能够自动解压缩。
DistributedCache可以用来分发简单的只读文件,或者一些复杂的例如archive,jar文件等。archive文件会自动解压缩,而jar文件会被自动放置到任务的classpath中(lib)。分发压缩archive时,可以指定解压名称如:dict.zip#dict。这样就会解压到dict中,否则默认是dict.zip中。
文件是有执行权限的。用户可以选择在任务的工作目录下建立指向DistributedCache的软链接。
  • DistributedCache.createSymlink(conf);   
  •     DistributedCache.addCacheFile(new Path("hdfs://host:port/absolute-path#link-name").toUri(), conf);      

DistributedCache.createSymlink(Configuration)方法让DistributedCache在当前工作目录下创建到缓存文件的符号链接。则在task的当前工作目录会有link-name的链接,相当于快捷方法,链接到expr.txt文件,在setup方法使用的情况则要简单许多。或者通过设置配置文件属性mapred.create.symlink为yes。分布式缓存会截取URI的片段作为链接的名字。例如,URI是hdfs://namenode:port/lib.so.1#lib.so,则在task当前工作目录会有名为lib.so的链接,它会链接分布式缓存中的lib.so.1
DistributedCache会跟踪修改缓存文件的timestamp。
下面是使用的例子, 为应用app设置缓存   
1. 将需要的文件copy到FileSystem中:  
  •   $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat   
  •   $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip   
  •   $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar  
  •   $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar  
  •   $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz  
  •   $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz  

2. 设置app的jobConf:  
  •   JobConf job = new JobConf();  
  •   DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),   
  •                                 job);  
  •   DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);  
  •   DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);  
  •   DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);  
  •   DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);  
  •   DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);  

3. 在mapper或者reducer中使用缓存文件:  
  •   public static class MapClass extends MapReduceBase   
  •   implements Mapper<K, V, K, V> {  
  •    
  •     private Path[] localArchives;  
  •     private Path[] localFiles;  
  •       
  •     public void configure(JobConf job) {  
  •       // 得到刚刚缓存的文件  
  •       localArchives = DistributedCache.getLocalCacheArchives(job);  
  •       localFiles = DistributedCache.getLocalCacheFiles(job);  
  •     }  
  •       
  •     public void map(K key, V value,   
  •                     OutputCollector<K, V>; output, Reporter reporter)   
  •     throws IOException {  
  •       // 使用缓存文件  
  •       // ...  
  •       // ...  
  •       output.collect(k, v);  
  •     }  
  •   }  


  它跟GenericOptionsParser的部分功能有异曲同工之妙。
PathFilter + 通配符。accept(Path path)筛选path是否通过。


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-71149-1-1.html 上篇帖子: 云智慧发布5月在线医疗行业网站性能监测报告 下篇帖子: hadoop中mapreduce的常用类(二)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表