设为首页 收藏本站
查看: 961|回复: 0

[经验分享] Hadoop Combiner的几个调用时间点

[复制链接]

尚未签到

发表于 2016-12-12 07:33:12 | 显示全部楼层 |阅读模式

  Combiner是在Map端被执行,共有两个时机会被触发:

         ① 从环形缓冲器溢写分区文件的时候
         ② 合并溢写分区文件的时候
DSC0000.png
 


1. 初始化combinerRunner和combineCollector

MapTask.run()
  ┟ runNewMapper(job, split, umbilical, reporter);
     ┟ output = new NewOutputCollector(taskContext, job, umbilical, reporter);

if(job.getNumReduceTasks() == 0) {
output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
// 如果有reduce task,才会有Combiner task
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}


       ┟ collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
           ┟ 初始化combinerRunner和combineCollector

combinerRunner = CombinerRunner.create(job, getTaskID(),
combineInputCounter,
reporter, null);
if (combinerRunner != null) {
combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
} else {
combineCollector = null;
}


2. Combiner调用点1:磁盘溢写(spill)
  磁盘溢写会触发Combiner,有两个地方会触发溢写操作:


  • 输出Key-value到缓冲器
  • 关闭map函数输出流,执行flush方法时

2.1 输出Key-Value到缓冲器

MapTask.run()
  ┟ runNewMapper(job, split, umbilical, reporter);
  ┟ mapper.run(mapperContext);
  ┟ map(context.getCurrentKey(), context.getCurrentValue(), context); // map函数
  ┟ context.write((KEYOUT) key, (VALUEOUT) value); // map函数输出值
          ┟ NewOutputCollector.write()
  ┟ MapOutputBuffer.collect()
  ┟ startSpill();

if (kvstart == kvend && kvsoftlimit) {
LOG.info("Spilling map output: record full = " + kvsoftlimit);
startSpill(); //  缓冲器达到spill条件,溢写到磁盘
}
  ┟ spillReady.signal(); // 唤起spill线程
  SpillThread.run()
  ┟ sortAndSpill();

public void run() {
try {
...
while (kvstart == kvend) {
spillReady.await(); // 等待被唤醒
}
...
sortAndSpill();
...
  ┟ combinerRunner.combine(kvIter, combineCollector); // 运行combiner

int spstart = spindex; // spstart为kvoffet数组start, spindex为kvoffset数组end
while (spindex < endPosition &&
kvindices[kvoffsets[spindex % kvoffsets.length]
+ PARTITION] == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
// 如果start == end,说明该分区只有一条记录,则不进行combiner操作;否则执行combiner
if (spstart != spindex) {
combineCollector.setWriter(writer);
RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex);
combinerRunner.combine(kvIter, combineCollector);
}

2.2 map输出流flush方法

MapTask.run()
  ┟ runNewMapper(job, split, umbilical, reporter);
    ┟ output.close(mapperContext); // 关闭map输出流
      ┟ NewOutputCollector.close();
        ┟ collector.flush();
          ┟ MapOutputBuffer.flush()
            ┟ sortAndSpill(); 运行combiner,同上
 

3. Combiner调用点2:map端分区文件合并

MapTask.run()
  ┟ runNewMapper(job, split, umbilical, reporter);
    ┟ output.close(mapperContext); // 关闭map输出流
      ┟ NewOutputCollector.close();
        ┟ collector.flush();
          ┟ MapOutputBuffer.flush()
  ┟ mergeParts();

// minSpillsForCombine 在MapOutputBuffer构造函数内被初始化,
// numSpills 为mapTask已经溢写到磁盘spill文件数量
if (combinerRunner == null || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter, job);
} else {
combineCollector.setWriter(writer);
combinerRunner.combine(kvIter, combineCollector); <- 执行combiner
}

  --end

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312871-1-1.html 上篇帖子: Hadoop map task中Partitioner执行时机 下篇帖子: hadoop 函数阅读笔记之releaseSlot()
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表