设为首页 收藏本站
查看: 698|回复: 0

[经验分享] hadoop中LineReader的readLine方法解析

[复制链接]

尚未签到

发表于 2016-12-11 08:41:08 | 显示全部楼层 |阅读模式
  Hadoop默认的读取一条数据,使用的就是LineReader的readLine方法,这个方法具体怎么工作,可以直接看源码,因为比较复杂,所以加上一些注释:

    /**
* Read one line from the InputStream into the given Text. A line can be
* terminated by one of the following: '\n' (LF) , '\r' (CR), or '\r\n'
* (CR+LF). EOF also terminates an otherwise unterminated line.
*
* @param str the object to store the given line (without newline)
* @param maxLineLength the maximum number of bytes to store into str; the
*        rest of the line is silently discarded.
* @param maxBytesToConsume the maximum number of bytes to consume in this
*        call. This is only a hint, because if the line cross this
*        threshold, we allow it to happen. It can overshoot potentially by
*        as much as one buffer length.
*
* @return the number of bytes read including the (longest) newline found.
*
* @throws IOException if the underlying stream throws
*/
public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException
{
/*
* We're reading data from in, but the head of the stream may be already
* buffered in buffer, so we have several cases:
*
* 缓存的大小为默认的64k,在请求获取下一条数据的时候,有可能请求的数据已经在缓存中存在,
* 一条数据的大小有可能没有64k,这个情况是存在的,所以就分为下面几种情况:
*
* 1. 缓存中的数据没有新的一行的标记,如\n 或者 \r,那么就将这个缓存中的所有数据拷贝出来,
* 并且从另外一个缓存中读取一条记录的后半部分
*
* 2. 如果缓存中有不规则的结束行,则将这行赋给str,这一条什么意思 ?
*
* 3. 如果缓存中存在规则的结束行,如\r,那么将缓存中\r之前的数据赋值给str,但同时,也需要
* 看与\r紧挨着的字符是什么。如果是\n,那么在处理这条记录的同时,也需要将缓存中的这个\n消除,
* 这样下次读取一行的时候会从\n后面开始读。 使用标志prevCharCR 来标志前面一个字符是否为\r
* 如果缓冲区中的最后一个字符正好是\r,需要寻找紧挨这个字符的字符是什么,根据紧挨着的字符来判断
* 继续的动作
*
* 1. No newline characters are in the buffer,
* so we need to copy everything and read
* another buffer from the stream.
*
* 2. An unambiguously terminated line
* is in buffer, so we just copy to str.
*
* 3. Ambiguously terminated line
* is in buffer, i.e. buffer ends in CR. In this case we copy everything
* up to CR to str, but we also need to see what follows CR: if it's LF,
* then we need consume LF as well, so next call to readLine will read
* from after that. We use a flag prevCharCR to signal if previous
* character was CR and, if it happens to be at the end of the buffer,
* delay consuming it until we have a chance to look at the char that
* follows.
*/
// 清理str已有的数据
str.clear();
int txtLength = 0; // tracks str.getLength(), as an optimization
int newlineLength = 0; // length of terminating newline
boolean prevCharCR = false; // true of prev char was CR
long bytesConsumed = 0;
do
{
//
// bufferPosn存储缓冲区当前的位置,表示上一次读取数据到达的位置
// 如果上一次读取到400位置的字符,那么现在冲401开始读取
//
int startPosn = bufferPosn; // starting from where we left off the
// last time
// 如果当前的位置超过了缓冲区的长度,这种情况的出现是因为
// 在当前的缓冲区中为读取一行结束的标识,所以的继续读入新的
// 数据填充缓冲区,以便继续寻找
if (bufferPosn >= bufferLength)
{
// 充值bufferPosn参数,从 0 开始
startPosn = bufferPosn = 0;
// 判断上一个字符是否为 \r
if (prevCharCR)
++bytesConsumed; // account for CR from previous read
// 冲输入流in中读取数据存储到buffer中,读取的数据长度为bufferLength
bufferLength = in.read(buffer);
// 如果没有读取到数据,则跳出循环
if (bufferLength <= 0)
break; // EOF
}
// 从bufferPosn位置开始往后读取
for (; bufferPosn < bufferLength; ++bufferPosn)
{
// search for newline
// 寻找新的一行的标志 \n
if (buffer[bufferPosn] == LF)
{
/***********************************
*     ‹p›          ‹p›
*      |            |
*      0080abcdk\r\ngabld008924\r\n
*
***********************************/
// 找到\n后,看前一个字符是否为\r,如果为\r
// 返回newlineLength = 2,否则为 newlineLength = 1
newlineLength = (prevCharCR) ? 2 : 1;
++bufferPosn; // at next invocation proceed from following
// byte
break;
}
/***********************************
*     ‹p›        ‹p›
*      |          |
*      0080abcdk\rgabld008924\r\n
*
***********************************/
// 到字符 g 的时候,发现前面一个字符是\r,newlineLength = 1
if (prevCharCR)
{
// CR + notLF, we are at notLF
newlineLength = 1;
break;
}
// 读取到\r的时候,prevCharCR被设置为true,下一轮就进入了前面的判断,
// 设置newlineLength = 1,跳出循环
prevCharCR = (buffer[bufferPosn] == CR);
}
// 获取此次读取的数据长度
int readLength = bufferPosn - startPosn;
// 处理第三种情况,正好读取缓冲区的最末尾,而且正好是\r
/***********************************
*     ‹p›       ‹p›
*      |       |←|
*      0080abcdk\r
*
***********************************/
if (prevCharCR && newlineLength == 0)
--readLength; // CR at the end of the buffer
bytesConsumed += readLength;
int appendLength = readLength - newlineLength;
// 如果当前这条记录的长度超过Text允许存储的数据长度maxLineLength
// 将appendLength赋值为 maxLineLength - txtLength
if (appendLength > maxLineLength - txtLength)
{
appendLength = maxLineLength - txtLength;
}
if (appendLength > 0)
{
// 将缓冲区buffer从startPson位置开始的长度为appendLength的数据赋值为str
str.append(buffer, startPosn, appendLength);
// 处理记录长度超过maxLineLength的情况,分多次赋值
txtLength += appendLength;
}
}
// newlineLength = 0 处理记录长度超过maxLineLength的情况,分多次赋值
while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
// 一条记录的长度太长了,跑出异常
if (bytesConsumed > (long) Integer.MAX_VALUE)
throw new IOException("Too many bytes before newline: "
+ bytesConsumed);
return (int) bytesConsumed;
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312559-1-1.html 上篇帖子: HBase如何从Hadoop读取数据,DFSInputStream 下篇帖子: 006_hadoop中MapReduce详解_3
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表