鸬鹚洲 发表于 2015-11-11 11:15:56

hadoop 对数据流的压缩和解压缩

  实现本地文件的压缩上传,命令示例:hadoopcompressFromLocal   /home/uncle_bai/workspace/Testdatahdfs://localhost:9000/input/test   org.apache.io.compress.GzipCodec
  public class compressFromLocal {
public static void main(String[] args) throws IOException, ClassNotFoundException {
String scrString=args;    //本地文件的地址
String dststrString=args; //hdfs文件系统的文件地址
String codetypeString=args; //指定编码方式
Configuration configuration=new Configuration();
Class<?> codeClass=Class.forName(codetypeString);
CompressionCodec codec=(CompressionCodec)ReflectionUtils.newInstance(codeClass, configuration);
dststrString=dststrString+codec.getDefaultExtension();   //为生成的压缩文件自动添加后缀
FileSystem fS=FileSystem.get(URI.create(dststrString), configuration);
InputStream inputStream=new FileInputStream(new File(scrString));   
OutputStream outputStream=codec.createOutputStream(fS.create(new Path(dststrString)));
                //压缩文件要用codec.creatOutputStream对输出流进行包装,因为压缩解压缩的动作是在hadoop端进行的,而不是本地进行的
                IOUtils.copyBytes(inputStream, outputStream, configuration);
}
}
   实现hdfs文件的解压缩。命令如:hadoop decompressFromHdfs hdfs://localhost:9000/input/Testdata.gz
  public class decompressFromHdfs {
public static void main(String[] args) throws IOException {
String srcString=args;
String dString=null;
Configuration configuration=new Configuration();
FileSystem fsFileSystem=FileSystem.get(URI.create(srcString), configuration);
CompressionCodecFactory factory=new CompressionCodecFactory(configuration);
CompressionCodec codec=factory.getCodec(new Path(srcString));//由读取的路径命获取解码器
dString=CompressionCodecFactory.removeSuffix(srcString, codec.getDefaultExtension());//自动去除后缀名
InputStream inputStream=codec.createInputStream(fsFileSystem.open(new Path(srcString)));
OutputStream outputStream=fsFileSystem.create(new Path(dString));
IOUtils.copyBytes(inputStream, outputStream, configuration);
}
}   CodecPool允许反复使用压缩禾解压缩,分摊创建这些对象需要的开销。







版权声明:本文为博主原创文章,未经博主允许不得转载。
页: [1]
查看完整版本: hadoop 对数据流的压缩和解压缩