longpan 发表于 2018-10-30 08:27:24

学习日志---hadoop的join处理

package org.robby.join;  

  
import java.io.BufferedReader;
  
import java.io.IOException;
  
import java.io.InputStreamReader;
  
import java.net.URI;
  
import java.util.HashMap;
  
import org.apache.hadoop.conf.Configuration;
  
import org.apache.hadoop.fs.FSDataInputStream;
  
import org.apache.hadoop.fs.FileSystem;
  
import org.apache.hadoop.fs.Path;
  
import org.apache.hadoop.io.*;
  
import org.apache.hadoop.mapreduce.Job;
  
import org.apache.hadoop.mapreduce.Mapper;
  
import org.apache.hadoop.mapreduce.Reducer;
  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  

  
public class MapJoinWithCache {
  
    public static class Map extends
  
            Mapper {
  
      private CombineValues combineValues = new CombineValues();
  
      private Text flag = new Text();
  
      private Text key = new Text();
  
      private Text value = new Text();
  
      private String[] keyValue = null;
  
      //这个keyMap就是存文件数据供map共享的
  
      private HashMap keyMap = null;
  

  
      @Override
  
      //这个map每行都会调用一次,传入数据
  
      //每次都会访问keyMap集合
  
      //因为setup方法处理了input1文件,因此这里只需要处理input2就行
  
      protected void map(LongWritable key, Text value, Context context)
  
                throws IOException, InterruptedException {
  
            keyValue = value.toString().split(",", 2);
  

  
            String name = keyMap.get(keyValue);
  

  
            this.key.set(keyValue);
  

  
            String output = name + "," + keyValue;
  
            this.value.set(output);
  
            context.write(this.key, this.value);
  
      }
  

  
      @Override
  
      //这个setup方法是在mapper类初始化运行的方法
  
      protected void setup(Context context) throws IOException,
  
                InterruptedException {
  
            //context传入文件路径
  
            URI[] localPaths = context.getCacheFiles();
  

  
            keyMap = new HashMap();
  
            for(URI url : localPaths){
  
               //通过uri打开hdfs文件系统
  
               FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop1:9000"), context.getConfiguration());
  
               FSDataInputStream in = null;
  
               //打开hdfs的对应文件,需要path类创建并传入,获取流对象
  
               in = fs.open(new Path(url.getPath()));
  
               BufferedReader br=new BufferedReader(new InputStreamReader(in));
  
               String s1 = null;
  
               while ((s1 = br.readLine()) != null)
  
               {
  
                     keyValue = s1.split(",", 2);
  

  
                     keyMap.put(keyValue, keyValue);
  
                     System.out.println(s1);
  
               }
  
               br.close();
  
            }
  
      }
  
    }
  

  
    public static class Reduce extends Reducer {
  

  
      //处理都在mpper中进行,reduce迭代分组后的数据就行
  
      @Override
  
      protected void reduce(Text key, Iterable values,
  
                Context context) throws IOException, InterruptedException {
  

  
            for(Text val : values)
  
                context.write(key, val);
  

  
      }
  

  
    }
  

  
    public static void main(String[] args) throws Exception {
  
      Configuration conf = new Configuration();
  
      Job job = Job.getInstance(conf);
  

  
      job.setJarByClass(MapJoinWithCache.class);
  
      job.setMapperClass(Map.class);
  
      job.setReducerClass(Reduce.class);
  

  
      job.setMapOutputKeyClass(Text.class);
  
      job.setMapOutputValueClass(Text.class);
  

  
      job.setOutputKeyClass(Text.class);
  
      job.setOutputValueClass(Text.class);
  

  
      job.setInputFormatClass(TextInputFormat.class);
  
      job.setOutputFormatClass(TextOutputFormat.class);
  

  
      Path outputPath = new Path(args);
  
      FileInputFormat.addInputPath(job, new Path(args));
  
      FileOutputFormat.setOutputPath(job, outputPath);
  
      outputPath.getFileSystem(conf).delete(outputPath, true);
  

  
      //其他都一样,这里在job中加入了要传入的文件路径,用作cache
  
      //可以传入多个文件,文件全路径
  
      job.addCacheFile(new Path(args).toUri());
  

  
      System.exit(job.waitForCompletion(true) ? 0 : 1);
  
    }
  
}


页: [1]
查看完整版本: 学习日志---hadoop的join处理