设为首页 收藏本站
查看: 2347|回复: 0

[经验分享] Flume NG 学习笔记(八)Interceptors(拦截器)测试

[复制链接]

尚未签到

发表于 2015-11-27 17:32:44 | 显示全部楼层 |阅读模式
  拦截器主要是对事件的header信息信息操作,要么直接忽略他,要么修改他的数据



一、Event Serializers
  file_roll sink 和hdfs sink 都支持EventSerializer接口

1.1、Body Text Serializer
  Body TextSerializer,别名:text。这个拦截器将把事件的body部分写入到输出流中而不需要任何转换或者修改。事件的header将直接被忽略。
  下面是官网配置:


Property Name


Default


Description



appendNewline


true


Whether a newline will be appended to each event at write time. The default of true assumes that events do not contain newlines, for legacy reasons.

  下面是官网例子:appendNewline是选择是否加入到新行去。默认是true,而false 就是换行,一般我们都选择换行。

a1.sinks=k1

a1.sinks.k1.type=file_roll

a1.sinks.k1.channel=c1

a1.sinks.k1.sink.directory=/var/log/flume

a1.sinks.k1.sink.serializer=text

a1.sinks.k1.sink.serializer.appendNewline=false

  下面是实际例子
  因为是考虑Body TextSerializer的特性,他会忽略header的信息,因此我们这边要采用http source来接收定义的header 与body 的内容
  #配置文件:body_case15.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = http
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /tmp/logs
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline =false
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


  #敲命令
  flume-ng agent -c conf -fconf/body_case15.conf -n a1 -Dflume.root.logger=INFO,console
  启动成功后
  
  打开另一个终端输入,往侦听端口送数据
  curl -X POST -d '[{"headers":{"looklook1" : "looklook1 isheader","looklook2": "looklook2 isheader"},"body" : "hellolooklook5"}]'http://192.168.233.128:50000
  #在启动源发送的代理终端查看console输出
   DSC0000.jpg


  
  
  数据已经输出,但只输出了hello looklook5,即BODY这块。

1.2、Avro Event Serializer
  Avro Event Serializer别名:avro_event。这个拦截器将把事件序列化到一个Avro容器文件中。使用的模式和RPC Avro机制使用到的处理flume事件的机制一样。这个序列化器继承自AbstractAvroEventSerializer类。
  官网例子


Property Name


Default


Description



syncIntervalBytes


2048000


Avro sync interval, in approximate bytes.



compressionCodec


null


Avro compression codec. For supported codecs, see Avro’s CodecFactory docs.

  下面是官网例子

a1.sinks.k1.type=hdfs

a1.sinks.k1.channel=c1

a1.sinks.k1.hdfs.path=/flume/events/%y-%m-%d/%H%M/%S

a1.sinks.k1.serializer=avro_event

a1.sinks.k1.serializer.compressionCodec=snappy

  例子这边就不演示了,因为和BodyText Serializer 差距不大。


二、Timestamp Interceptor
  官网说Flume 可以在事件传输过程中对它进行修改与删除,而这个都是通过Interceptor进行实现的,实际都是往事件的header里插数据。而Timestamp Interceptor拦截器就是可以eventheader中插入关键词为timestamp的时间戳
  下面是官网配置


Property Name


Default


Description



type




The component type name, has to be timestamp or the FQCN



preserveExisting


false


If the timestamp already exists, should it be preserved - true or false

  以及官网例子

a1.sources=r1

a1.channels=c1

a1.sources.r1.channels= c1

a1.sources.r1.type=seq

a1.sources.r1.interceptors=i1

a1.sources.r1.interceptors.i1.type=timestamp


下面是测试例子
  #配置文件:timestamp_case16.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.preserveExisting= false
a1.sources.r1.interceptors.i1.type = timestamp
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path =hdfs://carl:9000/flume/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = looklook5.
a1.sinks.k1.hdfs.fileType=DataStream
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
这里拿header作为文件夹目录名称。
#敲命令
flume-ng agent -c conf -f conf/timestamp_case16.conf-n a1 -Dflume.root.logger=INFO,console
启动成功后

打开另一个终端输入,往侦听端口送数据
echo "TimestampInterceptor" | nc 192.168.233.128 50000
#在启动源发送的代理终端查看console输出
  查看hdfs生成的文件,可以看到timestamp已经生成在header里面,可以根据自定义的格式生成文件夹,数据也都传输过来了。

DSC0001.jpg
  

三、Host Interceptor
  
  该拦截器可以往event的header中插入关键词默认为host主机名或者ip地址(注意是agent运行的机器的主机名或者ip地址)
  下面是官网配置


Property Name


Default


Description



type




The component type name, has to be host



preserveExisting


false


If the host header already exists, should it be preserved - true or false



useIP


true


Use the IP Address if true, else use hostname.



hostHeader


host


The header key to be used.

  以及官网例子

a1.sources=r1

a1.channels=c1

a1.sources.r1.interceptors=i1

a1.sources.r1.interceptors.i1.type=host

a1.sources.r1.interceptors.i1.hostHeader=hostname


下面是测试例子
  #配置文件:time_host_case17.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.preserveExisting= false
a1.sources.r1.interceptors.i1.type =timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader =hostname
a1.sources.r1.interceptors.i2.useIP = false
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path =hdfs://carl:9000/flume/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = %{hostname}
a1.sinks.k1.hdfs.fileType=DataStream
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
增加一个拦截器,类型是host,h将hostname作为文件前缀。
#敲命令
flume-ng agent -c conf -f conf/time_host_case17.conf-n a1 -Dflume.root.logger=INFO,console
启动成功后

打开另一个终端输入,往侦听端口送数据
echo "Time&hostInterceptor1" | nc 192.168.233.128 50000
echo "Time&hostInterceptor2" | nc 192.168.233.128 50000

#在启动源发送的代理终端查看console输出
  查看hdfs生成的文件,可以看到host已经生成在header里面,可以根据自定义的格式生成文件夹,数据也都传输过来了。

DSC0002.jpg
  

四、Static Interceptor
  
  Static Interceptor拦截器允许用户增加一个static的header并为所有的事件赋值。范围是所有事件。
  
官网配置


Property Name


Default


Description



type




The component type name, has to be static



preserveExisting


true


If configured header already exists, should it be preserved - true or false



key


key


Name of header that should be created



value


value


Static value that should be created

  其中参数key与value等于类似json格式里的"headers":{" key":" value"}
  下面是官网例子

a1.sources=r1

a1.channels=c1

a1.sources.r1.channels= c1

a1.sources.r1.type=seq

a1.sources.r1.interceptors=i1

a1.sources.r1.interceptors.i1.type=static

a1.sources.r1.interceptors.i1.key=datacenter

a1.sources.r1.interceptors.i1.value=NEW_YORK

  以及实际的列子
  #配置文件:static_case18.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = looklook5
a1.sources.r1.interceptors.i1.value =looklook10
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#敲命令
flume-ng agent -c conf -f conf/static_case18.conf-n a1 -Dflume.root.logger=INFO,console
启动成功后

打开另一个终端输入,往侦听端口送数据
echo "statInterceptor1" | nc 192.168.233.128 50000

#在启动源发送的代理终端查看console输出
  可以看到输出的header信息里自定义部分正确输出,body部分也输出正确。

DSC0003.jpg
  

五、Regex FilteringInterceptor
  
  Regex Filtering Interceptor拦截器用于过滤事件,筛选出与配置的正则表达式相匹配的事件。可以用于包含事件和排除事件。常用于数据清洗,通过正则表达式把数据过滤出来。
  官网配置


Property Name


Default


Description



type




The component type name has to be regex_filter



regex


”.*”


Regular expression for matching against events



excludeEvents


false


If true, regex determines events to exclude, otherwise regex determines events to include.

  excludeEvents 为true的时候为排除所有匹配正则表达式的数据。
  下面是测试例子
  #配置文件:regex_filter_case19.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =regex_filter
a1.sources.r1.interceptors.i1.regex =^[0-9]*$
a1.sources.r1.interceptors.i1.excludeEvents =true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  我们对开头字母是数字的数据,全部过滤。
#敲命令
flume-ng agent -c conf -f conf/regex_filter_case19.conf-n a1 -Dflume.root.logger=INFO,console
启动成功后

打开另一个终端输入,往侦听端口送数据
echo "a" | nc192.168.233.128 50000
echo "1222" |nc 192.168.233.128 50000
echo "a222" |nc 192.168.233.128 50000

#在启动源发送的代理终端查看console输出

DSC0004.jpg
  
  可以看出1222 被认为是无效的数据没有发出来。
  Regex Filtering Interceptor测试成功。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144282-1-1.html 上篇帖子: Flume学习04 — Sink 下篇帖子: 详细图解 Flume介绍、安装配置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表