设为首页 收藏本站
查看: 235|回复: 0

[经验分享] RegexExtractorInterceptor实现源码分析

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-3-12 08:07:28 | 显示全部楼层 |阅读模式
RegexExtractorInterceptor作为一个Interceptor实现类可以根据一个正则表达式匹配event body来提取字符串,并使用serializers把字符串作为header的值

实例:
以如下的命令使用execsource收集日志的时候,可以根据文件的名称设置不同的header,进行不同的操作

1
2
3
4
#!/bin/sh
filename=$1
hostname=`hostname -s`
tail -F $1 | awk -v filename=$filename -v hostname=$hostname '{print filename":"hostname":"$0}'



source的配置:

1
2
3
4
5
6
7
8
xxxx.sources.kafka1.interceptors = i1
xxxx.sources.kafka1.interceptors.i1.type = regex_extractor
xxxx.sources.kafka1.interceptors.i1.regex = /apps/logs/(.*?)/
xxxx.sources.kafka1.interceptors.i1.serializers = s1
xxxx.sources.kafka1.interceptors.i1.serializers.s1.name = logtypename
xxxx.sources.kafka1.selector.type = multiplexing
xxxx.sources.kafka1.selector.header = logtypename
xxxx.sources.kafka1.selector.mapping.nginx = nginx-channel




几个参数项:
regex 正则表达式

1
2
3
4
5
6
serializers  定义匹配组(正则匹配之后的值作为header的值,比如如果
Event body为1:2:3.4foobar5,regex为(\\d):(\\d):(\\d),serializers
设置为a b c,serializers.a.name 为one,serializers.b.name为two,serializers.c.name
为three,那么one->1,two->2,three->3.4foobar5,注意可以不必匹配所有的组)

serializers.x.name 作为event的header




首先看内部类Builder:
1)configureSerializers方法用来生成配置项,主要是操作List<NameAndSerializer>,静态内部类NameAndSerializer是一个包含了headerName和RegexExtractorInterceptorSerializer属性的容器,这里每一个serializers.x.name的配置对应一个RegexExtractorInterceptorSerializer对象,RegexExtractorInterceptorSerializer默认是org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer,即对参数不做任何处理直接返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private List<NameAndSerializer> serializerList;
private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer();
....
     private void configureSerializers(Context context) {
      String serializerListStr = context.getString( SERIALIZERS ); //解析serializers的配置
      Preconditions. checkArgument(!StringUtils. isEmpty(serializerListStr),
          "Must supply at least one name and serializer" );
      String[] serializerNames = serializerListStr.split( "\\s+" ); //按空格分隔
      Context serializerContexts =
          new Context(context.getSubProperties( SERIALIZERS + "."));
      serializerList = Lists. newArrayListWithCapacity(serializerNames.length);
      for(String serializerName : serializerNames) { //对每一个serializers里面的设置进行操作
        Context serializerContext = new Context(
            serializerContexts.getSubProperties(serializerName + "." ));
        String type = serializerContext.getString( "type" , "DEFAULT" ); //获取serializers.x.type的设置,默认值是DEFAULT,即org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
        String name = serializerContext.getString( "name" ); ////获取serializers.x.name的设置
        Preconditions. checkArgument(!StringUtils. isEmpty(name),
            "Supplied name cannot be empty." );
        if ("DEFAULT" .equals(type)) {
          serializerList .add(new NameAndSerializer(name, defaultSerializer)); //生成NameAndSerializer对象,并加入到List<NameAndSerializer>中,静态内部类NameAndSerializer是一个包含了headerName和RegexExtractorInterceptorSerializer属性的容器,这里每一个serializers.x.name的配置对应一个RegexExtractorInterceptorSerializer对象
        } else {
          serializerList .add(new NameAndSerializer(name, getCustomSerializer(
              type, serializerContext))); //getCustomSerializer用于根据type的设置返回RegexExtractorInterceptorSerializer对象
        }
      }
    }



这里org.apache.flume.interceptor.RegexExtractorInterceptorSerializer 接口类,定义了一个抽象方法serialize,实现类包括:

1
2
3
4
org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
//直接返回,不做另外的操作(默认的类)
org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
//使用指定的formatting pattern把传入的值转换为milliseconds



  
2)build方法用于返回一个RegexExtractorInterceptor对象

1
return new RegexExtractorInterceptor( regex , serializerList );



RegexExtractorInterceptor的主要方法intercept:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static final String REGEX = "regex" ;
  static final String SERIALIZERS = "serializers" ;
...
  public Event intercept(Event event) {
    Matcher matcher = regex.matcher(
        new String(event.getBody(), Charsets.UTF_8)); //对Event的body进行matcher操作
    Map<String, String> headers = event.getHeaders(); // 获取Event的header键值对
    if (matcher.find()) { //检测字符串中的子字符串是否可以匹配到正则
      for ( int group = 0, count = matcher.groupCount(); group < count; group++) {
        int groupIndex = group + 1; // 匹配的index从1开始
        if (groupIndex > serializers .size()) { //判断index是否大于serializers列表(configure产生的List<NameAndSerializer>)的长度
....
          break;
        }
        NameAndSerializer serializer = serializers.get(group); //从serializers中获取对应的NameAndSerializer 对象
....
        headers.put(serializer. headerName,
            serializer. serializer.serialize(matcher.group(groupIndex))); // 向Event中插入headerName和对应的value,这里headerName即为serializers.x.name的设置,value会通过RegexExtractorInterceptorSerializer进行处理
      }
    }
    return event;
  }



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-45789-1-1.html 上篇帖子: 为GRUB引导菜单设置密码 下篇帖子: Centos7下进入单用户模式
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表