设为首页 收藏本站
查看: 1722|回复: 0

[经验分享] Hadoop源码解析之: TextInputFormat如何处理跨split的行

[复制链接]

尚未签到

发表于 2015-7-14 08:05:39 | 显示全部楼层 |阅读模式
  我们知道hadoop将数据给到map进行处理前会使用InputFormat对数据进行两方面的预处理:


  • 对输入数据进行切分,生成一组split,一个split会分发给一个mapper进行处理。
  • 针对每个split,再创建一个RecordReader读取Split内的数据,并按照的形式组织成一条record传给map函数进行处理。
  
最常见的FormatInput就是TextInputFormat,在split的读取方面,它是将给到的Split按行读取,以行首字节在文件中的偏移做key,以行数据做value传给map函数处理,这部分的逻辑是由它所创建并使用的RecordReader:LineRecordReader封装和实现的.关于这部分逻辑,在一开始接触hadoop时会有一个常见的疑问:如果一个行被切分到两个split里(这几乎是一定会发生的情况),TextInputFormat是如何处理的?如果是生硬地把一行切割到两个split里,是对数据的一种破坏,可能会影响数据分析的正确性(比如word count就是一个例子).搞清楚这个问题还是需要从源码入手了解TextInputFormat的详细工作方式,这里简单地梳理记录如下(本文参考的是hadoop1.1.2的源码):

  
  1. LineRecordReader会创建一个org.apache.hadoop.util.LineReader实例,并依赖这个LineReader的readLine方法来读取一行记录,具体可参考org.apache.hadoop.mapred.LineRecordReader.next(LongWritable, Text),Line 176),那么关键的逻辑就在这个readLine方法里了,下面是添加了额外中文注释的该方法源码.这个方法主要的逻辑归纳起来是3点:


  • 总是是从buffer里读取数据,如果buffer里的数据读完了,先加载下一批数据到buffer
  • 在buffer中查找"行尾",将开始位置至行尾处的数据拷贝给str(也就是最后的Value).如果为遇到"行尾",继续加载新的数据到buffer进行查找.
  • 关键点在于:给到buffer的数据是直接从文件中读取的,完全不会考虑是否超过了split的界限,而是一直读取到当前行结束为止
  

/**
* Read one line from the InputStream into the given Text.  A line
* can be terminated by one of the following: '\n' (LF) , '\r' (CR),
* or '\r\n' (CR+LF).  EOF also terminates an otherwise unterminated
* line.
*
* @param str the object to store the given line (without newline)
* @param maxLineLength the maximum number of bytes to store into str;
*  the rest of the line is silently discarded.
* @param maxBytesToConsume the maximum number of bytes to consume
*  in this call.  This is only a hint, because if the line cross
*  this threshold, we allow it to happen.  It can overshoot
*  potentially by as much as one buffer length.
*
* @return the number of bytes read including the (longest) newline
* found.
*
* @throws IOException if the underlying stream throws
*/
public int readLine(Text str, int maxLineLength,
int maxBytesToConsume) throws IOException {
/* We're reading data from in, but the head of the stream may be
* already buffered in buffer, so we have several cases:
* 1. No newline characters are in the buffer, so we need to copy
*    everything and read another buffer from the stream.
* 2. An unambiguously terminated line is in buffer, so we just
*    copy to str.
* 3. Ambiguously terminated line is in buffer, i.e. buffer ends
*    in CR.  In this case we copy everything up to CR to str, but
*    we also need to see what follows CR: if it's LF, then we
*    need consume LF as well, so next call to readLine will read
*    from after that.
* We use a flag prevCharCR to signal if previous character was CR
* and, if it happens to be at the end of the buffer, delay
* consuming it until we have a chance to look at the char that
* follows.
*/
str.clear();
int txtLength = 0; //tracks str.getLength(), as an optimization
int newlineLength = 0; //length of terminating newline
boolean prevCharCR = false; //true of prev char was CR
long bytesConsumed = 0;
do {
int startPosn = bufferPosn; //starting from where we left off the last time
//如果buffer中的数据读完了,先加载一批数据到buffer里
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
if (prevCharCR)
++bytesConsumed; //account for CR from previous read
bufferLength = in.read(buffer);
if (bufferLength  maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}//newlineLength == 0 就意味着始终没有读到行尾,程序会继续通过文件输入流继续从文件里读取数据。
//这里有一个非常重要的地方:in的实例创建自构造函数:org.apache.hadoop.mapred.LineRecordReader.LineRecordReader(Configuration, FileSplit)
//第86行:FSDataInputStream fileIn = fs.open(split.getPath()); 我们看以看到:
//对于LineRecordReader:当它对取“一行”时,一定是读取到完整的行,不会受filesplit的任何影响,因为它读取是filesplit所在的文件,而不是限定在filesplit的界限范围内。
//所以不会出现“断行”的问题!
} while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
if (bytesConsumed > (long)Integer.MAX_VALUE)
throw new IOException("Too many bytes before newline: " + bytesConsumed);   
return (int)bytesConsumed;
}
  
  
  2. 按照readLine的上述行为,在遇到跨split的行时,会到下一个split继续读取数据直至行尾,那么下一个split怎么判定开头的一行有没有被上一个split的LineRecordReader读取过从而避免漏读或重复读取开头一行呢?这方面LineRecordReader使用了一个简单而巧妙的方法:既然无法断定每一个split开始的一行是独立的一行还是被切断的一行的一部分,那就跳过每个split的开始一行(当然要除第一个split之外),从第二行开始读取,然后在到达split的结尾端时总是再多读一行,这样数据既能接续起来又避开了断行带来的麻烦.以下是相关的源码:
  在LineRecordReader的构造函数org.apache.hadoop.mapred.LineRecordReader.LineRecordReader(Configuration, FileSplit) 108到113行确定start位置时,明确注明::会特别地忽略掉第一行!
  

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
  
相应地,在LineRecordReader判断是否还有下一行的方法:org.apache.hadoop.mapred.LineRecordReader.next(LongWritable, Text) 170到173行中,while使用的判定条件是:当前位置小于
或等于split的结尾位置,也就说
:当当前以处于split的结尾位置上时,while依然会执行一次,这一次读到显然已经是下一个split的开始行了!
  
  

    // We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition()

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-86367-1-1.html 上篇帖子: eclipse中 unable to load native-hadoop library 下篇帖子: 转:hadoop启动问题(Unrecognized option:
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表