设为首页 收藏本站
查看: 1469|回复: 0

[经验分享] 修改Flume-NG的hdfs sink解析时间戳源码大幅提高写入性能(转)

[复制链接]

尚未签到

发表于 2015-9-17 08:28:55 | 显示全部楼层 |阅读模式
  原文地址:http://www.cnblogs.com/lxf20061900/p/4014281.html
  


  Flume-NG中的hdfs sink的路径名(对应参数"hdfs.path",不允许为空)以及文件前缀(对应参数"hdfs.filePrefix")支持正则解析时间戳自动按时间创建目录及文件前缀。
  在实际使用中发现Flume内置的基于正则的解析方式非常耗时,有非常大的提升空间。如果你不需要配置按时间戳解析时间,那这篇文章对你用处不大,hdfs sink对应的解析时间戳的代码位于org.apache.flume.sink.hdfs.HDFSEventSink的process()方法中,涉及两句代码:  



1 // reconstruct the path name by substituting place holders
2         String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
3             timeZone, needRounding, roundUnit, roundValue, useLocalTime);
4         String realName = BucketPath.escapeString(fileName, event.getHeaders(),
5           timeZone, needRounding, roundUnit, roundValue, useLocalTime);
  其中,realPath是正则解析时间戳之后的完整路径名,filePath参数就是配置文件中的"hdfs.path";realName就是正则解析时间戳之后的文件名前缀,fileName参数就是配置文件中的"hdfs.filePrefix"。其他参数都相同,event.getHeaders()是一个Map里面有时间戳(可以通过interceptor、自定义、使用hdfs sink的useLocalTimeStamp参数三种方式来设置),其他参数是时区、是否四舍五入以及时间单位等。
  BucketPath.escapeString这个方法就是正则解析时间戳所在,具体代码我们不再分析,现在我们编写一个程序测试一下BucketPath.escapeString这个方法的性能,运行这个测试类要么在源码中:




DSC0000.gif
public class Test {public static void main(String[] args) {
HashMap<String, String> headers = new HashMap<String, String>();
headers.put("timestamp", Long.toString(System.currentTimeMillis()));
String filePath = "hdfs://xxxx.com:8020/data/flume/%Y-%m-%d";
String fileName = "%H-%M";
long start = System.currentTimeMillis();
System.out.println("start time is:" + start);
for (int i = 0; i < 2400000; i++) {
        String realPath = BucketPath.escapeString(filePath, headers, null, false, Calendar.SECOND, 1, false);
        String realName = BucketPath.escapeString(fileName, headers, null, false, Calendar.SECOND, 1, false);
        }
     long end = System.currentTimeMillis();
     System.out.println("end time is:"+ end + ".\nTotal time is:" + (end - start) + " ms.");
   }
}

  这个方法后面5个参数我们一般不需要用到,因此这里其实都设置成在实际中没有影响的数值了。headers参数要有“timestamp”参数,我们这里循环处理240W个event,看看运行结果:



start time is:1412853253889
end time is:1412853278210.
Total time is:24321 ms.
  我靠,居然花了24s还多,尼玛要知道哥目前白天的数据量也就是每秒4W个event,这还不是峰值呢。。。加上解析时间戳全量就扛不住了,怎么办??
  能怎么办啊?只能想办法替换这个解析办法了,于是,我就想到这样了,看测试程序:





public class Test {
private static SimpleDateFormat sdfYMD = null;
private static SimpleDateFormat sdfHM = null;
public static void main(String[] args) {
sdfYMD = new SimpleDateFormat("yyyy-MM-dd");
sdfHM = new SimpleDateFormat("HH-mm");
HashMap<String, String> headers = new HashMap<String, String>();
headers.put("timestamp", Long.toString(System.currentTimeMillis()));
String filePath = "hdfs://dm056.tj.momo.com:8020/data/flume/%Y-%m-%d";
String fileName = "%H-%M";
long start = System.currentTimeMillis();
System.out.println("start time is:" + start);
for (int i = 0; i < 2400000; i++) {
//String realPath = BucketPath.escapeString(filePath, headers, null, false, Calendar.SECOND, 1, false);
//String realName = BucketPath.escapeString(fileName, headers, null, false, Calendar.SECOND, 1, false);
String realPath = getTime("yyyy-MM-dd",Long.parseLong(headers.get("timestamp")));
String realName = getTime("HH-mm",Long.parseLong(headers.get("timestamp")));
}
long end = System.currentTimeMillis();
System.out.println("end time is:"+ end + ".\nTotal time is:" + (end - start) + " ms.");
}
public static String getTime(String format,long timestamp) {
String time="";
if(format.equals("HH-mm"))
time=sdfHM.format(timestamp);
else if(format.equals("yyyy-MM-dd"))
time=sdfYMD.format(timestamp);        
return time;
}
}

  我们使用java自己的SimpleDateFormat来完成按指定格式的解析,这样就不能将整个path或者name传进去了,看看运行结果:



start time is:1412853670246
end time is:1412853672204.
Total time is:1958 ms.
  尼玛!!!不是吧,不到2s。。。我这是在我的MBP上测试的,i5+8G+128G SSD,骚年你还犹豫什么呢?
  来开始改动源码吧。。。
  我们最好把解析格式做成可配置的,并且最好还保留原来的可以加前缀名的方式,因为有可能需要加入主机名啊什么的,但是可以把这个前缀作为中缀,解析时间戳的结果作为前缀。。。
  1、我们需要两个SimpleDateFormat来分别实现对path和name的格式化,并在配置时完成实例化,这样可以创建一次对象就Ok,还需要path和name的格式化串,这个可以做成全局的或者局部的,我们这是全局的(其实没有必要,是不是?哈哈),变量声明阶段代码:



   private SimpleDateFormat sdfPath = null;        //for file in hdfs path
private SimpleDateFormat sdfName = null;        //for file name prefix
private String filePathFormat;
private String fileNameFormat;
  2、configure(Context context)方法中需要对上述对象进行配置了,很简单,很明显,相关代码如下:





1      filePath = Preconditions.checkNotNull(
2                 context.getString("hdfs.path"), "hdfs.path is required");
3         filePathFormat =  context.getString("hdfs.path.format", "yyyy/MM/dd");        //time's format ps:"yyyy-MM-dd"
4         sdfPath = new SimpleDateFormat(filePathFormat);
5         fileName = context.getString("hdfs.filePrefix", defaultFileName);
6         fileNameFormat = context.getString("hdfs.filePrefix.format", "HHmm");
7         sdfName = new SimpleDateFormat(fileNameFormat);

  增加的是上面的3、4、6、7四行代码,解析格式串是在"hdfs.path.format"和"hdfs.filePrefix.format"中进行配置,其它的地方不要存在时间戳格式串了,也不要出现原来内置的那些%H、%mm等等格式了。上面两个format配置有默认格式串,自己做决定就好。
  3、增加解析时间戳方法:





1     public String getTime(String type,long timestamp) {
2         String time="";
3         if(type.equals("name"))
4             time=sdfName.format(timestamp);
5         else if(type.equals("path"))
6             time=sdfPath.format(timestamp);
7         return time;
8     }

  参数type用来指定是文件名的还是路径名的,用来调用相应地格式化对象。
  4、下面是重点了,上面几步即使配置了,不在这修改也不会起任何作用,修改process()方法,用以下代码替换最上面提到的两行代码:





1                 String realPath = filePath;
2                 String realName = fileName;
3                 if(realName.equals("%host") && event.getHeaders().get("host") != null)
4                     realName = event.getHeaders().get("host").toString();
5                 if(event.getHeaders().get("timestamp") != null){
6                     long time = Long.parseLong(event.getHeaders().get("timestamp"));
7                     realPath += DIRECTORY_DELIMITER + getTime("path",time);
8                     realName = getTime("name",time) + "." + realName;
9                 }        

  这几行的逻辑其实有:A、可以自定义中缀("hdfs.filePrefix",可以是常量或者是"%host",后者用来获取主机名,前提是要设置hostinterceptor);B、默认中缀就是默认的"FlumeData";C、如果headers中存在时间戳,调用getTime方法解析时间戳。
  5、编译&打包&替换&运行。。。
  哥打包比较原始,因为只修改了一个类,就把编译后的class文件以HDFSEventSink开头的几个class文件替换了原来flume-hdfs-sink的jar包中的对应的class文件。。。尼玛,原始吧。。。会maven,直接上maven吧。。。
  
  我这边的测试结果是如果没有配置压缩功能,性能提升超过70%,如果配置上压缩功能(gzip)性能提升超过50%,这数值仅供参考,不同环境不同主机不同人品可能不尽相同。。
  期待大伙的测试结果。。。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-114642-1-1.html 上篇帖子: 手动编译Flume 下篇帖子: 【Java】【Flume】Flume-NG阅读源代码AvroSink
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表