设为首页 收藏本站
查看: 642|回复: 0

[经验分享] Hadoop的MapReduce中多文件输出

[复制链接]

尚未签到

发表于 2016-12-12 07:26:19 | 显示全部楼层 |阅读模式
  这两天在网上看了个MapReduce的多文件输出的帖子:
http://blog.csdn.net/inkfish。写的不错。

  
我试着完成了一下。也是分为三个文件:我这三个文件,跟原作者的稍有不同。其中有些类是我原来写的,我直接拷贝过来的,所以有点不同。

  
My_LineRead.java

  
public class My_LineRead<K, V> extends RecordWriter<K, V>{private static final String utf8 = "UTF-8";private static final  String colon = "----";  //划分符号private static final byte[] newline;static {try {newline = "/n".getBytes(utf8);} catch (UnsupportedEncodingException uee) {throw new IllegalArgumentException("can't find " + utf8 + " encoding");}}protected DataOutputStream out;private final byte[] keyValueSeparator;public My_LineRead(DataOutputStream out) {this(out, colon); //调用下面的构造函数}public My_LineRead(DataOutputStream out, String keyValueSeparator) {// TODO Auto-generated constructor stubthis.out = out;try {this.keyValueSeparator = keyValueSeparator.getBytes(utf8);} catch (UnsupportedEncodingException e) {// TODO Auto-generated catch blockthrow new IllegalArgumentException("can't find " + utf8 + " encoding");}}@Overridepublic void close(TaskAttemptContext arg0) throws IOException,InterruptedException {// TODO Auto-generated method stubout.close();}@Overridepublic void write(K key, V value) throws IOException,InterruptedException {if (!(key == null && key instanceof NullWritable)){//如果key不为空者输出keyif ((Object)key instanceof Text){Text to = (Text) key;out.write(to.getBytes(), 0, to.getLength());}else{out.write(key.toString().getBytes(utf8));}out.write(keyValueSeparator);}if (!(value == null && value instanceof NullWritable)){//如果value不为空则输出valueif ((Object)value instanceof Text){Text to = (Text) value;out.write(to.getBytes(), 0, to.getLength());}else{out.write(value.toString().getBytes(utf8));}out.write(newline);}}}

  
MyMultipleOutputFormat.java //这个类,我添加了些注释便于理解

  
public abstract class MyMultipleOutputFormat  <K extends WritableComparable<?>, V extends Writable>  extends FileOutputFormat<K, V> {//接口类,需要在主程序中实现generateFileNameForKeyValue来获取文件名private MultiRecordWriter writer = null;  @Overridepublic RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)throws IOException, InterruptedException {// TODO Auto-generated method stub//如果第一次调用那么writer=nullif (writer == null) {  //getTaskOutputPath获取output路径writer = new MultiRecordWriter(job, getTaskOutputPath(job));  }  return writer;}private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {Path workPath = null;OutputCommitter committer = super.getOutputCommitter(conf);if (committer instanceof FileOutputCommitter) {workPath = ((FileOutputCommitter) committer).getWorkPath();} else {Path outputPath = super.getOutputPath(conf);if (outputPath == null) {throw new IOException("Undefined job output-path");}workPath = outputPath;}return workPath;}/**通过key, value, conf来确定输出文件名(含扩展名)*///返回值就是文件名。可以根据key,value来判断protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);//MultiRecordWriter类public class MultiRecordWriter extends RecordWriter<K, V> {/**RecordWriter的缓存*/private HashMap<String, RecordWriter<K, V>> recordWriters = null;private TaskAttemptContext job = null;/**输出目录*/private Path workPath = null;//构造函数public MultiRecordWriter(TaskAttemptContext job, Path workPath) {super();this.job = job;this.workPath = workPath;recordWriters = new HashMap<String, RecordWriter<K, V>>();}//关闭,应该可能是多个文件进行关闭,所有采用循环//recordWriters.values() 就是指的getBaseRecordWriter返回的值。@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();while (values.hasNext()) {values.next().close(context);}this.recordWriters.clear();}@Overridepublic void write(K key, V value) throws IOException, InterruptedException {//得到输出文件名String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());//如果recordWriters里没有文件名,那么就建立。否则就直接写值。RecordWriter<K, V> rw = this.recordWriters.get(baseName);if (rw == null) {rw = getBaseRecordWriter(job, baseName);//放入HashMapthis.recordWriters.put(baseName, rw);}rw.write(key, value);}// ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)throws IOException, InterruptedException {//获取配置文件Configuration conf = job.getConfiguration();//查看是否使用解码器boolean isCompressed = getCompressOutput(job);String keyValueSeparator = ",";RecordWriter<K, V> recordWriter = null;if (isCompressed) {Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,GzipCodec.class);CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);Path file = new Path(workPath, baseName + codec.getDefaultExtension());FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);recordWriter = new My_LineRead<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator);}//如果不使用解码器else {Path file = new Path(workPath, baseName);FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);//recordWriter = new My_LineRead<K, V>(fileOut, keyValueSeparator);//这里我使用的我自己的OutputFormatrecordWriter = new My_LineRead<K, V>(fileOut);}return recordWriter;}}}

  
最后就是测试类,WordCount_MulFileOut.java

  
public class WordCount_MulFileOut {public static  class wordcountMapper extendsMapper<LongWritable, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{String line = value.toString();StringTokenizer itr = new StringTokenizer(line);while(itr.hasMoreElements()){word.set(itr.nextToken());context.write(word, one);}}}public static  class wordcountReduce extendsReducer<Text, IntWritable, Text, IntWritable>{public void reduce(Text key, Iterable<IntWritable>values, Context context)throws IOException, InterruptedException{int sum = 0;for (IntWritable str : values){sum += str.get();}context.write(key, new IntWritable(sum));}}public static class MyMultiple extends MyMultipleOutputFormat{@Overrideprotected String generateFileNameForKeyValue(WritableComparable key,Writable value, Configuration conf) {// TODO Auto-generated method stubreturn "other.txt";}}public static  void main(String args[])throws Exception{Configuration conf = new Configuration();Job job = new Job(conf, "wordcount");job.setJarByClass(WordCount_MulFileOut.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(MyMultiple.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setMapperClass(wordcountMapper.class);job.setReducerClass(wordcountReduce.class);job.setCombinerClass(wordcountReduce.class);FileInputFormat.setInputPaths(job, new Path(args[1]));FileOutputFormat.setOutputPath(job, new Path(args[2]));job.waitForCompletion(true);}}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312864-1-1.html 上篇帖子: [实验]hadoop例子 在线用户分析 下篇帖子: hadoop 写操作的完整笔记
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表