设为首页 收藏本站
查看: 671|回复: 0

[经验分享] Hadoop关于大量小压缩文件的问题和解决方法

[复制链接]

尚未签到

发表于 2016-12-12 09:08:36 | 显示全部楼层 |阅读模式
之前一段时间偶尔会收到 hadoop 集群的 nagios 监控报警,具体报警是几个 resource-manager 节点一直负载超过阀值.找了个空闲时间分析了一下 job-history,发现是一个小伙伴的 job在段时间内创建了大量 map-task 导致的,在解决问题后做个笔记备忘
 

首先分析 job-history 的统计数据

DSC0000.png




  • 可以发现 map 任务执行的时间很短,但是同时会有大量的 map 任务
  • 与小伙伴沟通后,了解到他的 job 是根据运营侧需求,本周都在应用埋点日志中提取指定按钮的点击计数
  • 应用埋点的日志记录了每个用户的所有访问路径和参数
    1) log-agent 通过 logback 将日志记录到本地
    2)每小时生成一个 gz 压缩包,并上传至 hdfs 指定目录(根据应用标识+ip 生产目录规则)

根据 hadoop 的 map split 机制我们可以得出如下结论


  • 每个 inputfile 会对应多个 map split(根据 hdfs 的 block zise切分)
  • 每个map split会对应一个 map task
  • 由于每个小时生成的 gz 文件均未超过hdfs block zise(128m)
  • 小伙伴要统计的集群中有三个应用节点,排除凌晨时段没有日志产出的情况,大概一周的日志文件树=24*7*3~=350

解决问题


  • 我们知道 hadoop中是可以利用CombineFileInputFormat来合并大量小文件输入,提高 map 性能的.
  • 但默认实现只提供了CombineSequenceFileInputFormat和CombineTextFileInputFormat,没有压缩文件的支持.
  • 所以这里要实现自定义的CombineFileInputFormat来解决该问题

自定义CompressedCombineFileInputFormat

package ctu.components.amada.hadoop.usertrace;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import java.io.IOException;
/**
* Created by pippo on 14/12/23.
*/
public class CompressedCombineFileInputFormat extends CombineFileInputFormat<CompressedRecordKey, Text> {
public RecordReader<CompressedRecordKey, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException {
return new CombineFileRecordReader<>((CombineFileSplit) split,
context,
CompressedCombineFileRecordReader.class);
}
}
 

 CompressedCombineFileRecordReader

package ctu.components.amada.hadoop.usertrace;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.util.LineReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* Created by pippo on 14/12/23.
*/
public class CompressedCombineFileRecordReader extends RecordReader<CompressedRecordKey, Text> {
private long offset;
private long end;
private long pos;
private CompressedRecordKey key;
private Text value = new Text();
private CompressTrunk trunk;
private LineReader reader;
public CompressedCombineFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index)
throws IOException {
/*多个压缩文件合并为一个ombine file, 那么实际的压缩文件就是file中的一个trunk*/
this.trunk = new CompressTrunk(context.getConfiguration(), split.getPath(index));
/*trunk在combine 中的起始位置*/
this.offset = split.getOffset(index);
/* trunk在combine file中的结束位置*/
this.end = offset + (trunk.isCompress() ? trunk.getFileLength() : split.getLength(index));
boolean skipFirstLine = false;
FSDataInputStream in = trunk.open();
if (offset != 0) {
skipFirstLine = true;
--offset;
in.seek(offset);
}
reader = new LineReader(trunk.open());
// skip first line and re-establish "offset".
if (skipFirstLine) {
offset += reader.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - offset));
}
this.pos = offset;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
}
@Override
public void close() throws IOException {
trunk.close();
}
@Override
public float getProgress() throws IOException {
if (offset == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - offset) / (float) (end - offset));
}
}
@Override
public boolean nextKeyValue() throws IOException {
initKey();
initValue();
//指定当前记录的读取起始位置
key.offset = pos;
int readed = 0;
//读取一条记录
if (pos < end) {
readed = reader.readLine(value);
pos += readed;
}
//如果没有读到任何记录,说明当前 trunk 已经没有更多记录了
if (readed == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
private void initKey() {
if (key == null) {
key = new CompressedRecordKey();
key.fileName = trunk.getFileName();
}
}
private void initValue() {
if (value == null) {
value = new Text();
}
}
@Override
public CompressedRecordKey getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
public static class CompressTrunk {
public CompressTrunk(Configuration configuration, Path compressFile) throws IOException {
this.configuration = configuration;
this.compressFile = compressFile;
this.fs = compressFile.getFileSystem(configuration);
this.factory = new CompressionCodecFactory(configuration);
this.codec = factory.getCodec(compressFile);
prepareReadableFile();
}
/*将trunk解压缩到一个临时目录,并提供inputStream供读取*/
protected void prepareReadableFile() throws IOException {
if (!isCompress()) {
readableFile = compressFile;
return;
}
String _readFile = CompressionCodecFactory.removeSuffix(compressFile.toString(),
codec.getDefaultExtension());
readableFile = new Path(_readFile);
InputStream in = null;
OutputStream out = null;
try {
in = codec.createInputStream(fs.open(compressFile));
out = fs.create(readableFile);
IOUtils.copyBytes(in, out, configuration);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
}
private Configuration configuration;
//源文件
private Path compressFile;
//解压后的文件
private Path readableFile;
private FileSystem fs;
private CompressionCodecFactory factory;
private CompressionCodec codec;
public boolean isCompress() {
return codec != null;
}
public String getFileName() {
return readableFile.getName();
}
public long getFileLength() throws IOException {
return fs.getFileStatus(readableFile).getLen();
}
private FSDataInputStream in;
public FSDataInputStream open() throws IOException {
if (in == null) {
in = fs.open(readableFile);
}
return in;
}
//处理完毕后删除临时文件
public void close() throws IOException {
if (in != null) {
in.close();
}
if (isCompress()) {
fs.delete(readableFile, false);
}
}
}
}
 

CompressedRecordKey 

package ctu.components.amada.hadoop.usertrace;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* Created by pippo on 14/12/23.
*/
public class CompressedRecordKey implements WritableComparable {
//记录所在的文件
public String fileName;
//记录在文件中所在的位置
public long offset;
public CompressedRecordKey() {
super();
}
@Override
public void readFields(DataInput in) throws IOException {
this.offset = in.readLong();
this.fileName = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(offset);
out.writeUTF(fileName);
}
@Override
public int compareTo(Object o) {
CompressedRecordKey that = (CompressedRecordKey) o;
int f = this.fileName.compareTo(that.fileName);
if (f == 0) {
return (int) Math.signum((double) (this.offset - that.offset));
}
return f;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof CompressedRecordKey) {
return this.compareTo(obj) == 0;
}
return false;
}
@Override
public int hashCode() {
final int hashPrime = 47;
int hash = 13;
hash = hashPrime * hash + (this.fileName != null ? this.fileName.hashCode() : 0);
hash = hashPrime * hash + (int) (this.offset ^ (this.offset >>> 16));
return hash;
}
@Override
public String toString() {
return this.fileName + "-" + this.offset;
}
}
 

JOB 配置

private void buildMapper(Job job) {
job.setInputFormatClass(CompressedCombineFileInputFormat.class);
job.setMapperClass(LogMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(UserTrace.class);
job.setCombinerClass(LogCombiner.class);
}

验证结果 

DSC0001.png
 
DSC0002.png


  • 如图所示所有的输入被合并为三个map task处理
  • 共处理了3.7G的数据(File:Number of bytes read)/解压后37G(HDFS:Number of bytes read)

新的问题


  • CombineFileInputFormat没有split,导致只有三个map taks
  • 每个map task的输出文件过大,shuffle 消耗了1个多小时

问题定位
  通过查看 hadoop 源码发现,在使用CombineFileInputFormat时,如果没有显示设定CombineFileInputFormat.SPLIT_MAXSIZE,那么在一个 hadoop node 上只会有一个 split

DSC0003.png

问题解决
  将CombineFileInputFormat.SPLIT_MAXSIZE设置为和 hadoop 的 block size 一样大小
 
DSC0004.png
 

结果检验


  • 处理4.5g 的日志(解压后45g) 共耗时20分钟
  • 其中 map 处理1.4亿条记录耗时5分51秒
  • map 的 output 进行 lz4压缩,shuffle 的时间缩短到11分钟

DSC0005.png
 

DSC0006.png
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313033-1-1.html 上篇帖子: ubutun下eclipse调试hadoop的WordCount示例 下篇帖子: Hadoop源代码分析(包mapreduce.lib.map)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表