zsy001 发表于 2019-1-30 09:34:21

flume源码学习10

  这里以按自定义头部的配置为例(根据某些业务不同写入不同的主目录)
配置:
source:

interceptors = i1
interceptors.i1.type = regex_extractor
interceptors.i1.regex = /apps/logs/(.*?)/
interceptors.i1.serializers = s1
interceptors.i1.serializers.s1.name = logtypename  sink:

hdfs.path = hdfs://xxxxxx/%{logtypename}/%Y%m%d/%H
hdfs.round = true
hdfs.roundValue = 30
hdfs.roundUnit = minute
hdfs.filePrefix = xxxxx1-  在source中定义了regex_extractor 类型的interceptor,使用org.apache.flume.interceptor.RegexExtractorInterceptor类构建interceptor对象,这个interceptor可以根据一个正则表达式提取字符串,并使用serializers把字符串作为header的值,这header可以在sink中获取对应的值做进一步的操作.
比如写hdfs的sink HDFSEventSink的process方法中

       // reconstruct the path name by substituting place holders
      String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
            timeZone, needRounding, roundUnit , roundValue , useLocalTime );
      String realName = BucketPath.escapeString(fileName, event.getHeaders(),
          timeZone, needRounding, roundUnit , roundValue , useLocalTime );  几个参数项:
useLocalTime 是hdfs.useLocalTimeStamp的设置,默认是false
filePath为hdfs.path的设置,不能为空
fileName为hdfs.filePrefix的设置,默认为FlumeData
rounding(取近似值)的设置相关:

    needRounding = context.getBoolean( "hdfs.round", false );
    //hdfs.round的设置,默认为false
    if(needRounding) {
      String unit = context.getString( "hdfs.roundUnit", "second" );
      //hdfs.roundUnit,默认为second
      if (unit.equalsIgnoreCase( "hour")) {
      this.roundUnit = Calendar.HOUR_OF_DAY;
      } else if (unit.equalsIgnoreCase("minute" )) {
      this.roundUnit = Calendar.MINUTE;
      } else if (unit.equalsIgnoreCase("second" )){
      this.roundUnit = Calendar.SECOND;
      } else {
      LOG.warn("Rounding unit is not valid, please set one of" +
            "minute, hour, or second. Rounding will be disabled" );
      needRounding = false ;
      }
      this.roundValue = context.getInteger("hdfs.roundValue" , 1);
      //hdfs.roundValue值的设置,默认为1
      if(roundUnit == Calendar. SECOND || roundUnit == Calendar.MINUTE){
      //下面个为检测roundValue的值是否设置合理
      Preconditions.checkArgument(roundValue > 0 && roundValue0 && roundValue
页: [1]
查看完整版本: flume源码学习10