设为首页 收藏本站
查看: 1343|回复: 0

[经验分享] 自定义hadoop map/reduce输入文件切割InputFormat

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2014-6-6 09:43:15 | 显示全部楼层 |阅读模式
hadoop会对原始输入文件进行文件切割,然后把每个split传入mapper程序中进行处理,FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是有不同的子类进行实现的。        那么,FileInputFormat是怎样将他们划分成splits的呢?FileInputFormat只划分比HDFS block大的文件,所以如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。
       hadoop默认的InputFormat是TextInputFormat,重写了FileInputFormat中的createRecordReader和isSplitable方法。该类使用的reader是LineRecordReader,即以回车键(CR = 13)或换行符(LF = 10)为行分隔符。
      但大多数情况下,回车键或换行符作为输入文件的行分隔符并不能满足我们的需求,通常用户很有可能会输入回车键、换行符,所以通常我们会定义不可见字符(即用户无法输入的字符)为行分隔符,这种情况下,就需要新写一个InputFormat。
      又或者,一条记录的分隔符不是字符,而是字符串,这种情况相对麻烦;还有一种情况,输入文件的主键key已经是排好序的了,需要hadoop做的只是把相同的key作为一个数据块进行逻辑处理,这种情况更麻烦,相当于免去了mapper的过程,直接进去reduce,那么InputFormat的逻辑就相对较为复杂了,但并不是不能实现。
    1、改变一条记录的分隔符,不用默认的回车或换行符作为记录分隔符,甚至可以采用字符串作为记录分隔符。
     1)自定义一个InputFormat,继承FileInputFormat,重写createRecordReader方法,如果不需要分片或者需要改变分片的方式,则重写isSplitable方法,具体代码如下:
public class FileInputFormatB extends FileInputFormat<LongWritable, Text> {
   @Override
   public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) {
        return new SearchRecordReader("\b");
    }
    @Override
    protected boolean isSplitable(FileSystem fs, Path filename) {
         // 输入文件不分片
        return false;
     }
}
   2)关键在于定义一个新的SearchRecordReader继承RecordReader,支持自定义的行分隔符,即一条记录的分隔符。标红的地方为与hadoop默认的LineRecordReader不同的地方。
public class IsearchRecordReader extends RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(IsearchRecordReader.class);

private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private LongWritable key = null;
private Text value = null;
//行分隔符,即一条记录的分隔符
private byte[] separator = {'\b'};
private int sepLength = 1;
? public IsearchRecordReader(){
}
public IsearchRecordReader(String seps){
  this.separator = seps.getBytes();
  sepLength = separator.length;
}
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
  FileSplit split = (FileSplit) genericSplit;
  Configuration job = context.getConfiguration();
  this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
  this.start = split.getStart();
  this.end = (this.start + split.getLength());
  Path file = split.getPath();
  this.compressionCodecs = new CompressionCodecFactory(job);
  CompressionCodec codec = this.compressionCodecs.getCodec(file);
  // open the file and seek to the start of the split
  FileSystem fs = file.getFileSystem(job);
  FSDataInputStream fileIn = fs.open(split.getPath());
  boolean skipFirstLine = false;
  if (codec != null) {
   this.in = new LineReader(codec.createInputStream(fileIn), job);
   this.end = Long.MAX_VALUE;
  } else {
   if (this.start != 0L) {
    skipFirstLine = true;
    this.start -= sepLength;
    fileIn.seek(this.start);
   }
   this.in = new LineReader(fileIn, job);
  }
  if (skipFirstLine) { // skip first line and re-establish "start".
   int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start));
   
   if(newSize > 0){
    start += newSize;
   }
  }
  this.pos = this.start;
}
public boolean nextKeyValue() throws IOException {
  if (this.key == null) {
   this.key = new LongWritable();
  }
  this.key.set(this.pos);
  if (this.value == null) {
   this.value = new Text();
  }
  int newSize = 0;
  while (this.pos < this.end) {
   newSize = this.in.readLine(this.value, this.maxLineLength, Math.max(
(int) Math.min(Integer.MAX_VALUE, this.end - this.pos), this.maxLineLength));
   if (newSize == 0) {
    break;
   }
   this.pos += newSize;
   if (newSize < this.maxLineLength) {
    break;
   }
   LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - newSize));
  }
  if (newSize == 0) {
   //读下一个buffer
   this.key = null;
   this.value = null;
   return false;
  }
  //读同一个buffer的下一个记录
  return true;
}
public LongWritable getCurrentKey() {
  return this.key;
}
public Text getCurrentValue() {
  return this.value;
}
public float getProgress() {
  if (this.start == this.end) {
   return 0.0F;
  }
  return Math.min(1.0F, (float) (this.pos - this.start) / (float) (this.end - this.start));
}
public synchronized void close() throws IOException {
  if (this.in != null)
   this.in.close();
}
}
   3)重写SearchRecordReader需要的LineReader,可作为SearchRecordReader内部类。特别需要注意的地方就是,读取文件的方式是按指定大小的buffer来读,必定就会遇到一条完整的记录被切成两半,甚至如果分隔符大于1个字符时分隔符也会被切成两半的情况,这种情况一定要加以拼接处理。
public class LineReader {
  //回车键(hadoop默认)
  //private static final byte CR = 13;
  //换行符(hadoop默认)
  //private static final byte LF = 10;
   
  //按buffer进行文件读取
  private static final int DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;
  private int bufferSize = DEFAULT_BUFFER_SIZE;
  private InputStream in;
  private byte[] buffer;
  private int bufferLength = 0;
  private int bufferPosn = 0;
  
  LineReader(InputStream in, int bufferSize) {
   this.bufferLength = 0;
    this.bufferPosn = 0;
      
   this.in = in;
   this.bufferSize = bufferSize;
   this.buffer = new byte[this.bufferSize];
  }
  public LineReader(InputStream in, Configuration conf) throws IOException {
   this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
  }
  public void close() throws IOException {
   in.close();
  }
public int readLine(Text str, int maxLineLength) throws IOException {
   return readLine(str, maxLineLength, Integer.MAX_VALUE);
  }
  public int readLine(Text str) throws IOException {
   return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
  }
  //以下是需要改写的部分_start,核心代码
  public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{
   str.clear();
   Text record = new Text();
   int txtLength = 0;
   long bytesConsumed = 0L;
   boolean newline = false;
   int sepPosn = 0;
   
   do {
    //已经读到buffer的末尾了,读下一个buffer
    if (this.bufferPosn >= this.bufferLength) {
     bufferPosn = 0;
     bufferLength = in.read(buffer);
     
     //读到文件末尾了,则跳出,进行下一个文件的读取
     if (bufferLength <= 0) {
      break;
     }
    }
   
    int startPosn = this.bufferPosn;
    for (; bufferPosn < bufferLength; bufferPosn ++) {
     //处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)
     if(sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]){
      sepPosn = 0;
     }
     
     //遇到行分隔符的第一个字符
     if (buffer[bufferPosn] == separator[sepPosn]) {
      bufferPosn ++;
      int i = 0;
      
      //判断接下来的字符是否也是行分隔符中的字符
      for(++ sepPosn; sepPosn < sepLength; i ++, sepPosn ++){
      
       //buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半
       if(bufferPosn + i >= bufferLength){
        bufferPosn += i - 1;
        break;
       }
      
       //一旦其中有一个字符不相同,就判定为不是分隔符
       if(this.buffer[this.bufferPosn + i] != separator[sepPosn]){
        sepPosn = 0;
        break;
       }
      }
      
      //的确遇到了行分隔符
      if(sepPosn == sepLength){
       bufferPosn += i;
       newline = true;
       sepPosn = 0;
       break;
      }
     }
    }
   
    int readLength = this.bufferPosn - startPosn;
    bytesConsumed += readLength;
    //行分隔符不放入块中
    //int appendLength = readLength - newlineLength;
    if (readLength > maxLineLength - txtLength) {
     readLength = maxLineLength - txtLength;
    }
    if (readLength > 0) {
     record.append(this.buffer, startPosn, readLength);
     txtLength += readLength;
     
     //去掉记录的分隔符
     if(newline){
      str.set(record.getBytes(), 0, record.getLength() - sepLength);
     }
    }
   } while (!newline && (bytesConsumed < maxBytesToConsume));
   if (bytesConsumed > (long)Integer.MAX_VALUE) {
    throw new IOException("Too many bytes before newline: " + bytesConsumed);
   }
   
   return (int) bytesConsumed;
  }

  //以下是需要改写的部分_end
//以下是hadoop-core中LineReader的源码_start
public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{
    str.clear();
    int txtLength = 0;
    int newlineLength = 0;
    boolean prevCharCR = false;
    long bytesConsumed = 0L;
    do {
      int startPosn = this.bufferPosn;
      if (this.bufferPosn >= this.bufferLength) {
        startPosn = this.bufferPosn = 0;
        if (prevCharCR)  bytesConsumed ++;
        this.bufferLength = this.in.read(this.buffer);
        if (this.bufferLength <= 0)  break;
      }
      for (; this.bufferPosn < this.bufferLength; this.bufferPosn ++) {
        if (this.buffer[this.bufferPosn] == LF) {
          newlineLength = (prevCharCR) ? 2 : 1;
          this.bufferPosn ++;
          break;
        }
        if (prevCharCR) {
          newlineLength = 1;
          break;
        }
        prevCharCR = this.buffer[this.bufferPosn] == CR;
      }
      int readLength = this.bufferPosn - startPosn;
      if ((prevCharCR) && (newlineLength == 0))
        --readLength;
      bytesConsumed += readLength;
      int appendLength = readLength - newlineLength;
      if (appendLength > maxLineLength - txtLength) {
        appendLength = maxLineLength - txtLength;
      }
      if (appendLength > 0) {
        str.append(this.buffer, startPosn, appendLength);
        txtLength += appendLength; }
    }
    while ((newlineLength == 0) && (bytesConsumed < maxBytesToConsume));
    if (bytesConsumed > (long)Integer.MAX_VALUE) throw new IOException("Too many bytes before newline: " + bytesConsumed);
    return (int)bytesConsumed;
  }
//以下是hadoop-core中LineReader的源码_end
}
2、已经按主键key排好序了,并保证相同主键key一定是在一起的,假设每条记录的第一个字段为主键,那么如果沿用上面的LineReader,需要在核心方法readLine中对前后两条记录的id进行equals判断,如果不同才进行split,如果相同继续下一条记录的判断。代码就不再贴了,但需要注意的地方,依旧是前后两个buffer进行交接的时候,非常有可能一条记录被切成了两半,一半在前一个buffer中,一半在后一个buffer中。
     这种方式的好处在于少去了reduce操作,会大大地提高效率,其实mapper的过程相当的快,费时的通常是reduce。


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-20234-1-1.html 上篇帖子: eclipse编译hadoop源码 下篇帖子: mapreduce的文件拆分,FileInputFormat reduce 切割
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表