6.4.3 优化洗牌(shuffle)和排序阶段
洗牌和排序阶段都很耗费资源。洗牌需要在map和reduce任务之间传输数据,会导致过大的网络消耗。排序和合并操作的消耗也是很显著的。这一节将介绍一系列的技术来缓解洗牌和排序阶段的消耗。
技术46 规避使用reduce
Reduce在用于连接数据集的时候将会产生大量的网络消耗。
问题
需要考虑在MapReduce规避reduce的使用。
方案
通过将MapReduce参数setNumReduceTasks设置为0来创建一个只有map的作业。
讨论
洗牌和排序阶段一般都是用来连接数据集。但连接操作并不一定需要洗牌和排序,正如第4章中所介绍的。满足一定条件的连接可以只在map端运行。那么就只需要只有map的作业了。设置只有map的作业的命令如下。
job.setNumReduceTasks(0);
小结
一个只有map的作业的OutputFormat是和普通作业中reduce的OutputFormat一样。如图6.39所示。
如果无法规避reduce,那么就要尽量减小它对你的作业执行时间的影响。
技术47 过滤和投影
Map到Reduce之间传输数据要通过网络,这个成本很高。
问题
需要减少被洗牌的数据。
方案
减少map输出的每条记录的大小,并尽可能地减少map输出的数据量。
讨论
过滤和投影是关系运算中的概念,用以减少需要处理的数据。这些概念也可以用到MapReduce中减少map任务需要输出的数据。以下是过滤和投影的简明定义:
过滤是减少map输出的数据量。
投影是减少map输出的每条记录的大小。
以下是上述概念的演示代码:
1 Text outputKey = new Text();
2 Text outputValue = new Text();
3
4 @Override
5 public void map(LongWritable key, Text value,
6 OutputCollector output,
7 Reporter reporter) throws IOException {
8
9 String v = value.toString();
10
11 if (!v.startsWith("10.")) {
12 String[] parts = StringUtils.split(v, ".", 3);
13 outputKey.set(parts[0]);
14 outputValue.set(parts[1]);
15 output.collect(outputKey, outputValue);
16 }
17 }
小结
过滤和投影是在需要显著减少MapReduce作业运行时间时最容易的方法中的两种。
如果已经应用了这两种方法,但还需要进一步减少运行时间。那么就可以考虑combine。
技术48 使用combine
Combine可以在map阶段进行聚合操作来减少需要发送到reduce的数据。它是一个map端的优化工具,以map的输出作为输入。
问题
需要在过滤和投影后进一步减少运行时间。
方案
定义一个combine。在作业代码中使用setCombinerClass来调用它。
讨论
在map输出数据到磁盘的过程中,有两个子过程:溢洒(spill)子过程,合并子过程。Combine在这两个子过程中都会被调用,如图6.40所示。为了让combine在分组数据中效率最大,可以在两个子过程调用combine之前进行初步(precursory)的排序。
与设置map类类似,作业使用setCombinClass来设置combine。
job.setCombinerClass(Combine.class);
Combine的实现必须严格遵从reduce的规格说明。这里将假定使用技术39种的map。将map的输出中的记录按照下述条件合并:第二个八进制数相同。代码如下。
1 public static class Combine implements Reducer {
2
3 @Override
4 public void reduce(Text key, Iterator values,
5 OutputCollector output,
7 Reporter reporter) throws IOException {
8
9 Text prev = null;
10 while (values.hasNext()) {
11 Text t = values.next();
12 if (!t.equals(prev)) {
13 output.collect(key, t);
14 }
15 prev = ReflectionUtils.copy(job, t, prev);
16 }
17 }
18 }
Combine函数必须是可分布的(distributive)。如图6.40(在前面)所示,combine要被调用多次处理多个具有相同输入键的记录。这些记录的顺序是不可预测的。可分布函数是指,不论输入数据的顺序如何,最终的结果都一样。
小结
在MapReduce中combine非常有用,它能够减少map和reduce之间的网络传输数据和网络负载。下一个减少执行时间的有用工具就是二进制比较器。
技术49 用Comparator进行超快排序
MapReduce默认使用RawComparator对map的输出键进行比较排序。内置的Writable类(例如Text和IntWritable)是字节级实现。这样不用将字节形式的类解排列(unmarshal)成类对象。如果要通过WritableComparable实现自定义Writable,就有可能延长洗牌和排序阶段的时间,因为它需要进行解排列。
问题
存在自定义的Writable。需要减少作业的排序时间。
方案
实现字节级的Comparator来优化排序中的比较过程。
讨论
在MapReduce中很多阶段,排序是通过比较输出键来进行的。为了加快键排序,所有的map输出键必须实现WritableComparable接口。
1 public interface WritableComparable extends Writable, Comparable {
2
3 }
如果对4.2.1中的Person类进行改造,实现代码如下。
1 public class Person implements WritableComparable {
2 private String firstName;
3 private String lastName;
4
5 @Override
6 public int compareTo(Person other) {
7 int cmp = this.lastName.compareTo(other.lastName);
8 if (cmp != 0) {
9 return cmp;
10 }
11 return this.firstName.compareTo(other.firstName);
12 }
13 ...
这个Comparator的问题在于,如果要进行比较,就需要将字节形式的map的中间结果数据解排列成Writable形式。解排列要重新创建对象,因此成本很高。
Hadoop中的自带的各种Writable类不但扩展了WritableComparable接口,也提供了基于WritableComparator类的自定义Comparator。代码如下。
1 public class WritableComparator implements RawComparator {
2
3 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
4
5 try {
6 buffer.reset(b1, s1, l1);
7 key1.readFields(buffer);
8
9 buffer.reset(b2, s2, l2);
10 key2.readFields(buffer);
11 } catch (IOException e) {
12 throw new RuntimeException(e);
13 }
14 return compare(key1, key2);
15 }
16
17 /** Compare two WritableComparables.
18 *
19 * The default implementation uses the natural ordering,
20 * calling {@link
21 * Comparable#compareTo(Object)}. */
22 @SuppressWarnings("unchecked")
23 public int compare(WritableComparable a, WritableComparable b) {
24 return a.compareTo(b);
25 }
26 ...
27 }
要实现字节级的Comparator,需要重载compare方法。这里先学习一下IntWritable类如何实现这个方法。
1 public class IntWritable implements WritableComparable {
2
3 public static class Comparator extends WritableComparator {
4
5 public Comparator() {
6 super(IntWritable.class);
7 }
8
9 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
10 int thisValue = readInt(b1, s1);
11 int thatValue = readInt(b2, s2);
12 return (thisValue
运维网声明
1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网 享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com