设为首页 收藏本站
查看: 1504|回复: 0

[经验分享] Hadoop MapReduce输入输出类型

[复制链接]

尚未签到

发表于 2017-12-16 23:55:43 | 显示全部楼层 |阅读模式
一、输入格式
  1、输入分片split
  一个分片对应一个map任务;
  一个分片包含一个表(整个文件)上的若干行,而一条记录(单行)对应一行;
  分片包含一个以字节为单位的长度 和 一组存储位置,分片不包含实际的数据;
  map处理时会用分片的大小来排序,优先处理最大的分片;
  hadoop中Java定义的分片为InputSplit抽象类:主要两个方法,涉及分片长度,分片起始位置
  

    public abstract>public abstract long getLength() throws IOException, InterruptedException;public abstract String[] getLocations() throws IOException, InterruptedException;  }
  

  InputSplit不需要手动去处理它,它是由InputFormat生成;InputFormat负责产生输入分片并将它们分割成记录:
  

    public abstract>public abstract List<InputSplit> getSplits( JobContext context) throws IOException, InterruptedException;public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;  }   
  

  InputFormat抽象类定义的两个方法:getSplits() 和 createRecordReader()
  运行作业的客户端会调用getSplits()来计算分片,然后将它们发送到jobtracker,jobtracker会使用其存储位置来调度map任务从而在tasktracker上来处理这个分片数据。在tasktracker上,map任务把输入分片传给InputFormat的getRecordReader()方法来获得这个分片的RecordReader。RecordReader就是一个集合迭代器,map任务用一个RecordReader来生成记录的键/值对,然后再传递给map函数。
  2、FileInputFormat类
  FileInputFormat类是所有指定数据源实现类的基类,它本身主要有两个功能:a. 指定输入文件位置;b. 输入文件生成分片的实现代码段,具体实现由子类完成;
  继承图:
DSC0000.jpg     

  设置输入文件位置:
  FileInputFormat.addInputPath(job, new Path("hdfs://fileClusters:9000/wordcount.txt"));
  或 FileInputFormat.setInputPaths(job, new Path("hdfs://fileClusters:9000/wordcount.txt"));      
  可添加文件过滤器, FileInputFormat 中静态方法:
  public static void setInputPathFilter(Job job,>  文件添加时,默认就会有一个过滤器,过滤掉"." 和 "_"开头的文件,会过滤掉隐藏文件;自定义的过滤器也是在默认过滤的基础上过滤;
  切分的分片大小:
  一个split的大小计算:max( minimumSize, min( maximumSize, blockSize ));
  minimumSize默认为1,maximumSize默认为Long.MAX_VALUE;
  所以通常 blockSize 在 minimumSize和maximumSize之间,所以一般分片大小就是块大小。
  设置不切分文件:
  两种方法:
  a. 设置minimumSize的大小为Long.MAX_VALUE;
  b. 在实现FileInputFormat的子类时,重写isSplitable()方法返回为false;
  在mapper中获取文件分片信息:
  在mapper中可以获取当前处理的分片的信息,可通过context.getInputSplit()方法来获取一个split;当输入的格式源于FileInputFormat时,该方法返回的InputSplit可以被强制转换化一个FileSplit(继承自InputSplit),可调用如下信息:
  a. getPath()  Path/String  文件的路径
  b. getStart() long
  c. getLength() long
  自定义一个输入格式,把整个文件作为一条记录: 

DSC0001.gif DSC0002.gif
  

// Example 7-2. An InputFormat for reading a whole file as a record  
class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
  @Override
  protected boolean isSplitable(JobContext context, Path file) {
  return false;
  }
  

  @Override
  public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
  throws IOException, InterruptedException {
  WholeFileRecordReader reader = new WholeFileRecordReader();
  reader.initialize(split, context);
  return reader;
  }
  
}
  

  
//主要是实现RecordReader类
  
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
  private FileSplit fileSplit;
  private Configuration conf;
  private BytesWritable value = new BytesWritable();
  private boolean processed = false;
  

  @Override
  public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
  this.fileSplit = (FileSplit) split;
  this.conf = context.getConfiguration();
  }
  

  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
  if (!processed) {
  byte[] contents = new byte[(int) fileSplit.getLength()];
  Path file = fileSplit.getPath();
  FileSystem fs = file.getFileSystem(conf);
  FSDataInputStream in = null;
  try {
  in = fs.open(file);
  IOUtils.readFully(in, contents, 0, contents.length);
  value.set(contents, 0, contents.length);
  } finally {
  IOUtils.closeStream(in);
  }
  processed = true;
  return true;
  }
  return false;
  }
  

  @Override
  public NullWritable getCurrentKey() throws IOException, InterruptedException {
  return NullWritable.get();
  }
  

  @Override
  public BytesWritable getCurrentValue() throws IOException, InterruptedException {
  return value;
  }
  

  @Override
  public float getProgress() throws IOException {
  return processed ? 1.0f : 0.0f;
  }
  

  @Override
  public void close() throws IOException {
  // do nothing }
  
    }
  
}
  


View Code  整个文件作为一条记录的应用,把多个小文件合并为一个大文件:

  

public>static>private Text filenameKey;  

  @Override
protected void setup(Context context) throws IOException, InterruptedException {  InputSplit split
= context.getInputSplit();  Path path
= ((FileSplit) split).getPath();  filenameKey
= new Text(path.toString());  }
  

  @Override
protected void map(NullWritable key, BytesWritable value, Context context)throws IOException, InterruptedException {  context.write(filenameKey, value);
  }
  }
  

  @Override
public int run(String[] args) throws Exception {  Job job
= JobBuilder.parseInputAndOutput(this, getConf(), args);if (job == null) {return -1;  }
  job.setInputFormatClass(WholeFileInputFormat.
class);  job.setOutputFormatClass(SequenceFileOutputFormat.
class);  job.setOutputKeyClass(Text.
class);  job.setOutputValueClass(BytesWritable.
class);  job.setMapperClass(SequenceFileMapper.
class);return job.waitForCompletion(true) ? 0 : 1;  }
  

public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), args);  System.exit(exitCode);
  }
  
}
  


View Code  文本输入:
  a. TextInputFormat  行首偏移量:行内容
  扩展:如何处理跨行Block和InputSplit
  b. KeyValueTextInputFormat  以tab划分一行的key value
  c. NLineInputFormat  让每个map收到定义的相同行数,每个分片只包含N行
  二进制输入:
  Hadoop的MapReduce不只是可以处理文本信息,还可以处理二进制格式,通过会用以下几个类:
  SequenceFileInputFormat,处理SequenceFile 和 MapFile的文件类型;
  SequenceFileAsTextInputFormat 是 SequenceFileInputFormat的扩展,它将SequenceFile的键值转换为Text对象,这个转化是通过键和值上调用toString()方法实现。
  SequenceFileAsBinaryInputFormat 也是SequenceFileInputFormat的扩展,它将SequenceFile的键值作为二进制对象。它们被封装为BytesWritable对象,因而可以任意解释这些字节数组。
  多输入MultipleInputs:
   它可为每条输入路径指定InputForamt 和 Mapper:       
  

MutipleInputs.addInputPath(job , ncdcInputPath, TextInputFormat.class, MaxTemperatureMapper.class);  
MutipleInputs.addInputPath(job ,metofficeInputPath, TextInputFormat.
class, MetofficeMaxTemperatureMapper.class);  

  

//MutipleInputs还有一个重载,当只用一个Mapper时  
public static void addInputPath(Job job, Path path,>  

  它取代了FileInputFormat.addInputPath() 和 job.setMapperClass()的调用。
  合成输入:CompositeInputFormat ( join )、CombineFileInputFormat ( 多个小文件合并成一个split输入,是一个抽象类 )

二、输出格式
  继承图:  

     DSC0003.jpg
  文体输出TextOutputFormat:
  默认的输出是文本输出TextOutputFormat,它把每条记录写为文本行,它调用toString()方法把key value转化为字符串。
  与之对应的输入为KeyValueTextInputFormat;
  二进制输出:与输入对应。
  多输出:
  默认一个reducer生成一个输出文件,命名为part-r-00000;
  有时需要对输出的文件名进行控制 或 让每个redeucer输出多个文件,可利用 MultipleOutputFormat 类;
  范例:按气象站来区分气象数据,各个气象站输出到不同的文件中:
  方法一:可利用每个reducer创建一个输出文件的特点,通过设置多个分区,来输出到各个文件,这样做有两点不好:
  a. 分区个数必须预先就知道;可能有空reducer,可能有的获取不到气象站信息导致值丢失;
  b. 每个reducer处理一个气象站,可能需要过多的reducer,也会有严重的数据倾斜问题;
  方法二:使用 MutipleOutputs 类:


  

public>static>private NcdcRecordParser parser = new NcdcRecordParser();  

  @Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  parser.parse(value);
  context.write(
new Text(parser.getStationId()), value);  }
  }
  

static>private MultipleOutputs<NullWritable, Text> multipleOutputs;  

  @Override
protected void setup(Context context) throws IOException, InterruptedException {  multipleOutputs
= new MultipleOutputs<NullWritable, Text>(context);  }
  

  @Override
protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {for (Text value : values) {  multipleOutputs.write(NullWritable.get(), value, key.toString());
  }
  }
  

  @Override
protected void cleanup(Context context) throws IOException, InterruptedException {  multipleOutputs.close();
  }
  }
  

  @Override
public int run(String[] args) throws Exception {  Job job
= JobBuilder.parseInputAndOutput(this, getConf(), args);if (job == null) {return -1;  }
  job.setMapperClass(StationMapper.
class);  job.setMapOutputKeyClass(Text.
class);  job.setReducerClass(MultipleOutputsReducer.
class);  job.setOutputKeyClass(NullWritable.
class);return job.waitForCompletion(true) ? 0 : 1;  }
  

public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new PartitionByStationUsingMultipleOutputs(), args);  System.exit(exitCode);
  }
  
}
  


View Code  输出文件结果如下:          
  output/010010-99999-r-00027
  output/010050-99999-r-00013
  output/010100-99999-r-00015
  output/010280-99999-r-00014
  方法三:MultipleOutputFormat(旧API已移弃)的使用    

  

public static>private TextOutputFormat<Text, IntWritable> output = null;  

  @Override
protected RecordWriter<Text, IntWritable> getBaseRecordWriter(FileSystem fs, JobConf job, String name,  Progressable arg3)
throws IOException {if (output == null) {  output
= new TextOutputFormat<Text, IntWritable>();  }
return output.getRecordWriter(fs, job, name, arg3);  }
  

  @Override
protected String generateFileNameForKeyValue(Text key, IntWritable value, String name) {char c = key.toString().toLowerCase().charAt(0);if (c >= 'a' && c <= 'z') {return c + ".txt";  }
return "result.txt";  }
  
}
  


View Code

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-424879-1-1.html 上篇帖子: Hadoop Windows IDEA 下篇帖子: Hadoop 安装流程
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表