Combiner是在Map端被执行,共有两个时机会被触发:
① 从环形缓冲器溢写分区文件的时候
② 合并溢写分区文件的时候
1. 初始化combinerRunner和combineCollector
MapTask.run()
┟ runNewMapper(job, split, umbilical, reporter);
┟ output = new NewOutputCollector(taskContext, job, umbilical, reporter);
if(job.getNumReduceTasks() == 0) {
output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
// 如果有reduce task,才会有Combiner task
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
┟ collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
┟ 初始化combinerRunner和combineCollector
combinerRunner = CombinerRunner.create(job, getTaskID(),
combineInputCounter,
reporter, null);
if (combinerRunner != null) {
combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
} else {
combineCollector = null;
}
2. Combiner调用点1:磁盘溢写(spill)
磁盘溢写会触发Combiner,有两个地方会触发溢写操作:
输出Key-value到缓冲器
关闭map函数输出流,执行flush方法时
2.1 输出Key-Value到缓冲器
MapTask.run()
┟ runNewMapper(job, split, umbilical, reporter);
┟ mapper.run(mapperContext);
┟ map(context.getCurrentKey(), context.getCurrentValue(), context); // map函数
┟ context.write((KEYOUT) key, (VALUEOUT) value); // map函数输出值
┟ NewOutputCollector.write()
┟ MapOutputBuffer.collect()
┟ startSpill();
if (kvstart == kvend && kvsoftlimit) {
LOG.info("Spilling map output: record full = " + kvsoftlimit);
startSpill(); // 缓冲器达到spill条件,溢写到磁盘
}
┟ spillReady.signal(); // 唤起spill线程
SpillThread.run()
┟ sortAndSpill();
public void run() {
try {
...
while (kvstart == kvend) {
spillReady.await(); // 等待被唤醒
}
...
sortAndSpill();
...
┟ combinerRunner.combine(kvIter, combineCollector); // 运行combiner
int spstart = spindex; // spstart为kvoffet数组start, spindex为kvoffset数组end
while (spindex < endPosition &&
kvindices[kvoffsets[spindex % kvoffsets.length]
+ PARTITION] == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
// 如果start == end,说明该分区只有一条记录,则不进行combiner操作;否则执行combiner
if (spstart != spindex) {
combineCollector.setWriter(writer);
RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex);
combinerRunner.combine(kvIter, combineCollector);
}
2.2 map输出流flush方法
MapTask.run()
┟ runNewMapper(job, split, umbilical, reporter);
┟ output.close(mapperContext); // 关闭map输出流
┟ NewOutputCollector.close();
┟ collector.flush();
┟ MapOutputBuffer.flush()
┟ sortAndSpill(); 运行combiner,同上
3. Combiner调用点2:map端分区文件合并
MapTask.run()
┟ runNewMapper(job, split, umbilical, reporter);
┟ output.close(mapperContext); // 关闭map输出流
┟ NewOutputCollector.close();
┟ collector.flush();
┟ MapOutputBuffer.flush()
┟ mergeParts();
// minSpillsForCombine 在MapOutputBuffer构造函数内被初始化,
// numSpills 为mapTask已经溢写到磁盘spill文件数量
if (combinerRunner == null || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter, job);
} else {
combineCollector.setWriter(writer);
combinerRunner.combine(kvIter, combineCollector); <- 执行combiner
}
--end
运维网声明
1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网 享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com