设为首页 收藏本站
查看: 669|回复: 0

[经验分享] 《Hadoop技术内幕》读书笔记——Task运行过程分析

[复制链接]

尚未签到

发表于 2016-12-12 08:54:42 | 显示全部楼层 |阅读模式
  本文是董西成的Hadoop技术内幕一书的读书章节总结。
  
DSC0000.jpeg
 

第八章 Task运行过程分析

 
  所有Task需要周期性地向TaskTracker汇报最新进度和计数器值,而这正是由Reporter组件实现的,其中Reporter汇报的信息中包含两个部分:任务执行进度以及任务计数器值。

任务执行进度

hadoop采用简单的线性模型计算每个阶段的进度值,对于Map Task而言,作为一个大阶段不再分解,一般实用RecordReader中的getProgress()方法划定执行进度;对于Reduce Task而言,可以分成三个阶段:Shuffle,Sort和Reduce,每个阶段占任务总进度的1/3。

对于任务的Reporter而言,并不会总是每隔一段时间汇报进度和计数器值,而是仅当发现以下两种情况之一时才会汇报:


  • 任务执行进度发生变化;
  • 任务的某个计数器值发生变化;
  某个时间间隔内,如果任务执行进度和计数器值均未发生变化,则Task只会RPC调用ping函数探测TaskTracker是否alive,如果一直发生这种情况,TaskTracker认为这个任务处于悬挂状态,直接kill掉。因此我怀疑我们的任务被杀,也是因为这个原因:

 

AttemptID:attempt_1412848624484_2765_m_000032_0 Timed out after 1200 secsContainer killed by the ApplicationMaster. Container killed on request. Exit code is 143
  

为了避免这个问题,可以采用下面两种方法:


  • 每隔一段时间调用一次TaskReporter.progress()函数,告诉TaskTracker自己仍然活着;
  • 增大任务超时参数mapreduce.task.timeout参数(默认10分钟,用毫秒表示)。
  
任务计数器

任务计数器(Counter)是hadoop提供的,用于实现跟踪任务运行进度的全局计数功能,用户可以在自己的应用程序中添加计数器,计数器包括两个部分<name, value>,name表示计数器名称,value表示计数器值,hadoop规定一个作业最多包含120个计数器(修改mapreduce.job.counters.limit),50个计数器组。

Job的计数器分成两类:内置计数器和用户自定义计数器。

用户可以自定义计数器来进行MR程序的一些数据的统计,在老版本中需要自定义一个枚举,新版本中只需要提供对应的字符串变量即可:

reporter.getCounter(groupName, counterName).increment(1);
reporter.incrCounter(groupName, counterName, 1);
  

Map Task整体流程

Map Task总共分成5个阶段:


  • Read阶段:通过用户编写的RecordReader,从输入InputSplit中解析出一个key/value;
  • Map阶段:将解析出来的key/value交给用户编写的map函数处理,并产生一系列新的key/value;
  • Collect阶段:当Map阶段处理完数据之后,一般会调用OutputCollector.collect()输出结果,该函数内部会将生成的key/value通过Partitioner分片,写入一个环形缓冲区中;
  • Spill阶段:当缓冲区满之后,MapReduce会将数据写到本地磁盘中,生成临时文件,在将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并,压缩等操作;
  • Combine阶段:当所有的数据处理完成后,Map Task会对所有临时文件进行一次合并,确保最终只会生成一个数据文件。
  
DSC0001.png
 


Reduce Task整体流程


Reduce Task也分为5个阶段:


  • Shuffle阶段:也称为Copy阶段,Reduce Task从各个Map Task上远程拷贝一片数据,并针对某一片数据,如果其大小超过一点阈值,写到磁盘上,否则直接在内存中操作;
  • Merge阶段:在远程拷贝数据的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,防止内存实用过多或磁盘上的文件过多;
  • Sort阶段:用户编写的Reduce函数输入数据是按照key进行聚集的一组数据,与Map的数据有所不同。为了将key相同的数据聚集在一起,hadoop会先进行排序。由于事先在Map Task中的输出数据已经局部有序,因此Reduce Task只需要再对所有数据进行一次归并排序即可;
  • Reduce阶段:该阶段中,Reduce Task将每组数据依次交给用户编写的reduce函数处理;
  • Write阶段:OutputCollector.collect(key, value)会将计算结果写到HDFS上。
  
DSC0002.png
 


MapReduce的排序

排序是MapReduce框架中最重要的操作之一,Map Task和Reduce Task均会对数据按照key进行排序,这种排序操作属于Hadoop的默认行为,任何应用程序中的数据均会被排序。

Map Task会讲处理的结果暂时放到一个缓冲区中,当缓冲区使用率达到一定的阀值之后,对数据进行一次排序(内存中),并将这些数据以IFile的文件形式写到磁盘上。当全部的数据处理完成后,会对磁盘上的所有文件进行一次合并并排序成一个大的有序文件。

Reduce Task从每个Map Task上远程拷贝相应的数据文件,如果文件大小超过一定的阈值,放到磁盘上,否则暂时放到内存中,如果磁盘上的文件数目达到一定阈值,进行一次合并以声称一个更大的文件。当所有的数据都拷贝完成后,Reduce Task会统一对内存和磁盘上的所有数据进行一次合并。

Hadoop中的文件合并采用了多轮递归合并的方法,每轮选取最小的前io.sort.factor个文件进行合并,并将产生的文件重新加入待合并列表中,直到剩下的文件数量小于io.sort.factor。

在合并的过程中,采用的方式为小顶堆,在小顶堆中选取io.sort.factor,在排序完成后,将结果继续放到该小顶堆中,继续直到剩下的数量小于io.sort.factor,排序到只剩下一个文件。

Reduce端的Shuffle和Merge阶段

Reduce Task中Shuffle阶段和Merge阶段是并行进行的,当远程拷贝数据量达到一定的阈值后,便会出发相应的合并线程对数据进行合并,这个阶段也可以进一步划分为三个子阶段:


  • 准备运行完成的Map Task列表,GetMapEventsThread线程周期性地通过RPC从TaskTracker获取已完成的Map Task列表,为防止网络热点,Reduce Task通过对所有TaskTracker Host进行混洗操作以打乱数据拷贝顺序,并将调整后的Map Task输出数据位置保存到scheduleCopies列表中。
  • 远程拷贝数据,Reduce Task同时启动多个MapOutputCopier线程,这些线程从scheduleCopies列表中获取Map Task输出位置,并通过HTTP Get远程拷贝数据,对于获取的数据分片,如果大小超过一定阈值,则存放到磁盘上,否则直接放到内存中。
  • 合并内存文件和磁盘文件,为了防止内存或者磁盘上的文件过多,Reduce Task启动两个线程分别对磁盘和内存中的文件进行合并。
  Reduce端的Sort和Reduce阶段

所有的数据拷贝完成后,数据可能放在内存中或者磁盘上,还不能将数据直接交给用户编写的reduce函数处理,根据MapReduce的语义,Reduce Task需将key值相同的数据聚集在一起,并按组将数据交给reduce函数处理。

为此,Hadoop采用了基于排序的数据聚集策略,各个Map Task已经事先对自己的输出分派呢进行了局部排序,因此,Reduce Task只需要进行一次归并排序即可保证数据整体有序。为了提高效率,Hadoop将Sort阶段和Reduce阶段并行化。

在Sort阶段,Reduce Task为内存和磁盘中的文件建立了小顶堆,保存了指向该小顶堆节点的迭代器,且该迭代器保证了磁盘上的文件数目小于io.sort.factor, 当Reduce阶段开始时,内存中数据量小于最大可用内存(JVM MaxHeapSize)的mapred.job.reduce.input.buffer.precent。

在Reduce阶段,Reduce Task不断地移动迭代器,以将key相同的数据顺次地交给reduce函数处理,期间移动迭代器的过程实际上就是不断调整小顶堆的过程,这样Sort,Reduce就可以并行执行。 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313010-1-1.html 上篇帖子: Cloudera Impala: Real-Time Queries in Apache Hadoop, For Real 下篇帖子: [Hadoop] 新API容易遇到的一个问题: expected LongWritable recieved Text
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表