HDFS
以下代码给出了如何压缩一个已经在HDFS中的文件:
1 Configuration config = new Configuration();
2 FileSystem hdfs = FileSystem.get(config);
3 Class<?> codecClass = Class.forName(args[2]);
4 CompressionCodec codec = (CompressionCodec)
5 ReflectionUtils.newInstance(codecClass, config);
6 InputStream is = hdfs.open(new Path(args[0]));
7 OutputStream os = hdfs.create(new Path(args[0] + codec.getDefaultExtension()));
8 OutputStream cos = codec.createOutputStream(os);
9 IOUtils.copyBytes(is, cos, config, true);
10 IOUtils.closeStream(os);
11 IOUtils.closeStream(is);
缓存编码器
压缩编码器的创建成本很高。使用Hadoop的ReflectionUtils类可以缓存创建的实例,以加速后续的编码器实例创建。一个更好的选项是使用CompressionCodecFactory。它提供了编码器的缓存功能。
以下代码实现了读取HDFS中的压缩文件:
1 InputStream is = hdfs.open(new Path(args[0]));
2 Class<?> codecClass = Class.forName(args[1]);
3 CompressionCodec codec = (CompressionCodec)
4 ReflectionUtils.newInstance(codecClass, config);
5 InputStream cis = codec.createInputStream(is);
6 IOUtils.copyBytes(cis, System.out, config, true);
7 IOUtils.closeStream(is);
接下来介绍如何在MapReduce中使用压缩文件。
MAPREDUCE
在MapReduce中使用压缩文件,需要设置作业的一些参数。简单起见,假定map和reduce之间没有任何过滤和转换。
1 Class<?> codecClass = Class.forName(args[2]);
2 conf.setBoolean("mapred.output.compress", true); // Compress the reducer output.
3 conf.setBoolean("mapred.compress.map.output", true); // Compress the mapper output
4 conf.setClass("mapred.output.compression.codec", codecClass, CompressionCodec.class);
5 // The compression codec for compressing mapper output.
在MapReduce作业中处理压缩文件和非压缩文件的区别在于上述代码中的三行注释。
不仅仅是作业的输入和输入可以被压缩,中间过程的map输出也可以被压缩。因为map先输出到磁盘上,然后通过网络传输给reduce。压缩map的输出的效率的高低取决于被压缩对象的数据类型。一般来说,数据压缩都可以带来一定程度的效率提升。
为什么在代码中不需要指定输入文件的压缩编码器?FileInputFormat类使用CompressionCodecFactory来根据输入文件扩展名决定使用相匹配的已注册编码器。
MapReduce如何知道使用哪个编码器?相应的配置文件时mapred-site.xml。以下代码说明了如何注册前述编码器。注意,除了gzip,Deflate和bzip2,其他的编码器在注册前都需要手动编译并存放在集群上。
1 <property>
2 <name>io.compression.codecs</name>
3 <value>
4 org.apache.hadoop.io.compress.GzipCodec,
5 org.apache.hadoop.io.compress.DefaultCodec,
6 org.apache.hadoop.io.compress.BZip2Codec,
7 com.hadoop.compression.lzo.LzoCodec,
8 com.hadoop.compression.lzo.LzopCodec,
9 org.apache.hadoop.io.compress.SnappyCodec
10 </value>
11 </property>
12 <property>
13 <name>
14 io.compression.codec.lzo.class
15 </name>
16 <value>
17 com.hadoop.compression.lzo.LzoCodec
18 </value>
19 </property>
到这里就是全部的如何在MapReduce中处理压缩文件了。Pig和Hive是高层次语言,抽象了MapReduce中的一些底层细节。接下来介绍如何在Pig和Hive中使用压缩文件。
PIG
和MapReduce不同的是,Pig并不需要额外的代码来处理压缩文件。在使用Pig的本地库时,要保证Pig的JVM的java.library.path包括了本地库的地址。下列脚本用于为Pig设置正确的路径以读取本地库:
$ bin/pig-native-opts.sh
export PIG_OPTS="$PIG_OPTS -Djava.library.path=/usr/lib/..."
$ export PIG_OPTS="$PIG_OPTS -Djava.library.path=/usr/..."
为了让Pig能够处理压缩文件,还需要为压缩文件添加正确的扩展名。以下例子给出了如何压缩文件并读取到Pig中:
$ gzip -c /etc/passwd > passwd.gz
$ hadoop fs -put passwd.gz passwd.gz
$ pig
grunt> A = load 'passwd.gz' using PigStorage(':');
grunt> B = foreach A generate $0 as id;
grunt> DUMP B;
(root)
(bin)
(daemon)
...
Pig中输出到gzip文件的过程类似,同样需要指定文件扩展名。以下例子将Pig的关系B的结果存储到HDFS的一个文件中,并把它们复制到本地文件系统中以查看内容:
grunt> STORE B INTO 'passwd-users.gz';
# Ctrl+C to break out of Pig shell
$ hadoop fs -get passwd-users.gz/part-m-00000.gz .
$ gunzip -c part-m-00000.gz
root
bin
daemon
...
很简单。希望Hive中也是一样。
HIVE
和Pig中一样,需要做的就是在定义文件名时制定和编码器相匹配的扩展名:
hive> CREATE TABLE apachelog (...);
hive> LOAD DATA INPATH /user/aholmes/apachelog.txt.gz OVERWRITE INTO TABLE apachelog;
上述例子讲gzip压缩文件装载到了Hive中。在这个例子中,Hive将装载的文件移动到了Hive的仓库目录中,并使用raw文件作为表的存储。如何创建另外一个表,并制定表必须被压缩。以下代码通过Hive的配置启用了MapReduce压缩:(MapReduce作业将在最后一个语句中被调用来读取新表)
hive> SET hive.exec.compress.output=true;
hive> SET hive.exec.compress.intermediate = true;
hive> SET mapred.output.compression.codec = org.apache.hadoop.io.compress.GzipCodec;
hive> CREATE TABLE apachelog_backup (...);
hive> INSERT OVERWRITE TABLE apachelog_backup SELECT * FROM apachelog;
通过以下脚本可以检测Hive中apachelog_backup表的压缩结果(在HDFS上):
$ hadoop fs -ls /user/hive/warehouse/apachelog_backup
/user/hive/warehouse/apachelog_backup/000000_0.gz
需要注意的是,Hive推荐使用SequenceFile作为表的输出格式,因为SequenceFile可以分块压缩。
小结
这个技术提供了在Hadoop中压缩了的简便方法。对于不大的文件,这也是一个相对透明的方法。
如果压缩文件大小大于HDFS块的大小,就需要参考技术27中的分块压缩技术了。
(译注:本节原文较长,剩余部分将在下一篇文章翻译。)