设为首页 收藏本站
查看: 1417|回复: 0

[经验分享] Flume NG 学习笔记(四)Source配置

[复制链接]

尚未签到

发表于 2019-1-30 09:09:16 | 显示全部楼层 |阅读模式
首先、这节水的东西就比较少了,大部分是例子。
一、Avro Source与Thrift Source
Avro端口监听并接收来自外部的Avro客户流的事件。当内置Avro 去Sinks另一个配对Flume代理,它就可以创建分层采集的拓扑结构。官网说的比较绕,当然我的翻译也很弱,其实就是flume可以多级代理,然后代理与代理之间用Avro去连接
下面是官网给出的source的配置,加粗的参数是必选,描述就不解释了。
Property Name

Default

Description

channels



type


The component type name, needs to be avro

bind


hostname or IP address to listen on

port


Port # to bind to

threads


Maximum number of worker threads to spawn

selector.type



selector.*



interceptors


Space-separated list of interceptors

interceptors.*



compression-type

none

This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource

ssl

FALSE

Set this to true to enable SSL encryption. You must also specify a “keystore” and a “keystore-password”.

keystore


This is the path to a Java keystore file. Required for SSL.

keystore-password


The password for the Java keystore. Required for SSL.

keystore-type

JKS

The type of the Java keystore. This can be “JKS” or “PKCS12”.

ipFilter

FALSE

Set this to true to enable ipFiltering for netty

ipFilter.rules


Define N netty ipFilter pattern rules with this config.


官网的例子就不放了,这边用实际例子显示。


  [html] view plain copy

  •   #配置文件avro_case2.conf 其实和第二节的pull.conf 一模一样  
  •   #Name the components on this agent  
  •   a1.sources= r1  
  •   a1.sinks= k1  
  •   a1.channels= c1  
  •      
  •   #Describe/configure the source  
  •   a1.sources.r1.type = avro  
  •   a1.sources.r1.channels = c1  
  •   a1.sources.r1.bind = 192.168.233.128  
  •   a1.sources.r1.port = 55555  
  •      
  •   #Describe the sink  
  •   a1.sinks.k1.type= logger  
  •   a1.sinks.k1.channel= c1  
  •      
  •   #Use a channel which buffers events in memory  
  •   a1.channels.c1.type= memory  
  •   a1.channels.c1.capacity= 1000  
  •   a1.channels.c1.transactionCapacity= 100  
  



#敲命令
flume-ng agent -cconf -f conf/avro_case2.conf -n a1 -Dflume.root.logger=INFO,console
成功与否就不说明,与第二节的pull.conf 同。。。

#然后在另一个终端进行测试
flume-ng avro-client -cconf -H 192.168.233.128 -p 44444 -F /tmp/logs/test.log
这个就是模拟第二节push代理费pull代理发数据,这里不写配置直接命令方式测试。




发送事件成功,这里和push代理不一样的是没有用spool,所以日志文件名不会被改名称。
看接受终端显示




ok数据发送成功。
ThriftSource 与Avro Source 基本一致。只要把source的类型改成thrift即可,例如a1.sources.r1.type = thrift
比较简单,不做赘述。
二、Exec Source
ExecSource的配置就是设定一个Unix(Linux)命令,然后通过这个命令不断输出数据。如果进程退出,Exec Source也一起退出,不会产生进一步的数据。
下面是官网给出的source的配置,加粗的参数是必选,描述就不解释了。
Property Name

Default

Description

channels



type


The component type name, needs to be exec

command


The command to execute

shell


A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.

restartThrottle

10000

Amount of time (in millis) to wait before attempting a restart

restart

FALSE

Whether the executed cmd should be restarted if it dies

logStdErr

FALSE

Whether the command’s stderr should be logged

batchSize

20

The max number of lines to read and send to the channel at a time

selector.type

replicating

replicating or multiplexing

selector.*


Depends on the selector.type value

interceptors


Space-separated list of interceptors

interceptors.*




下面是实际例子显示。


  [html] view plain copy

  •   #配置文件exec_case3.conf  
  •   #Name the components on this agent  
  •   a1.sources= r1  
  •   a1.sinks= k1  
  •   a1.channels= c1  
  •      
  •   #Describe/configure the source  
  •   a1.sources.r1.type = exec  
  •   a1.sources.r1.command = tail -F /tmp/logs/test.log  
  •   a1.sources.r1.channels = c1  
  •      
  •   #Describe the sink  
  •   a1.sinks.k1.type= logger  
  •   a1.sinks.k1.channel= c1  
  •      
  •   #Use a channel which buffers events in memory  
  •   a1.channels.c1.type= memory  
  •   a1.channels.c1.capacity= 1000  
  •   a1.channels.c1.transactionCapacity= 100  
  



这里我们用tail –F命令去一直都日志的尾部。
#敲命令
flume-ng agent -cconf -f conf/exec_case3.conf -n a1 -Dflume.root.logger=INFO,console
这边会显示读取日志的所有数据








上图是日志,这边我们继续往日志里添加数据
echo"looklook5" >>  test.log ,会发现终端也在输出数据。

三、JMS Source
官网说:JMS Sourcereads messages from a JMS destination such as a queue or topic. Being a JMSapplication it should work with any JMS provider but has only been tested withActiveMQ.
简单说的,官网JMSsource 就测试了ActiveMQ,其他的还没有。下面是官网的例子:
a1.sources=r1

a1.channels=c1

a1.sources.r1.type=jms

a1.sources.r1.channels=c1

a1.sources.r1.initialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactory

a1.sources.r1.connectionFactory=GenericConnectionFactory

a1.sources.r1.providerURL=tcp://mqserver:61616

a1.sources.r1.destinationName=BUSINESS_DATA

a1.sources.r1.destinationType=QUEUE

下面是官网给出的source的配置,加粗的参数是必选,描述就不解释了
Property Name

Default

Description

channels



type


The component type name, needs to be jms

initialContextFactory


Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory

connectionFactory


The JNDI name the connection factory shoulld appear as

providerURL


The JMS provider URL

destinationName


Destination name

destinationType


Destination type (queue or topic)

messageSelector


Message selector to use when creating the consumer

userName


Username for the destination/provider

passwordFile


File containing the password for the destination/provider

batchSize

100

Number of messages to consume in one batch

converter.type

DEFAULT

Class to use to convert messages to flume events. See below.

converter.*


Converter properties.

converter.charset

UTF-8

Default converter only. Charset to use when converting JMS TextMessages to byte arrays.


介于这个源目前还不成熟,那我们等他成熟了再来研究吧,这里偷点懒。

四、Spooling Directory Source
Spooling Directory Source在第二节的时候已经讲过,这里复述一下:监测配置的目录下新增的文件,并将文件中的数据读取出来。其中,Spool Source有2个注意地方,第一个是拷贝到spool目录下的文件不可以再打开编辑,第二个是spool目录下不可包含相应的子目录。这个主要用途作为对日志的准实时监控。
下面是官网给出的source的配置,加粗的参数是必选。可选项太多,这边就加一个fileSuffix,即文件读取后添加的后缀名,这个是可以更改。
Property Name

Default

Description

channels



type


The component type name, needs to be spooldir.

spoolDir


The directory from which to read files from.

fileSuffix

.COMPLETED

Suffix to append to completely ingested files

下面给出例子,这个与第二节的push.conf 相似


  [html] view plain copy

  •   #配置文件:spool_case4.conf  
  •   # Name the components on this agent  
  •   a1.sources = r1  
  •   a1.sinks = k1  
  •   a1.channels = c1  
  •      
  •   # Describe/configure the source  
  •   a1.sources.r1.type =spooldir  
  •   a1.sources.r1.spoolDir =/tmp/logs  
  •   a1.sources.r1.fileHeader= true  
  •   a1.sources.r1.channels =c1  
  •      
  •   # Describe the sink  
  •   a1.sinks.k1.type = logger  
  •    a1.sinks.k1.channel = c1  
  •      
  •   # Use a channel which buffers events inmemory  
  •   a1.channels.c1.type = memory  
  •   a1.channels.c1.capacity = 1000  
  •   a1.channels.c1.transactionCapacity = 100  
  



这里我们监控日志目录/tmp/logs
#敲命令
flume-ng agent -cconf -f conf/spool_case4.conf -n a1 -Dflume.root.logger=INFO,console




终端将数据都显示出来了。我们查看监控日志目录/tmp/logs




被读取的文件已经被加上后缀名,表示已经完成读取。

五、NetCat Source
Netcat source 在某一端口上进行侦听,它将每一行文字变成一个事件源,也就是数据是基于换行符分隔。它的工作就像命令nc -k -l [host] [port] 换句话说,它打开一个指定端口,侦听数据将每一行文字变成Flume事件,并通过连接通道发送。
下面是官网给出的source的配置,加粗的参数是必选
Property Name

Default

Description

channels



type


The component type name, needs to be netcat

bind


Host name or IP address to bind to

port


Port # to bind to

max-line-length

512

Max line length per event body (in bytes)

ack-every-event

TRUE

Respond with an “OK” for every event received

selector.type

replicating

replicating or multiplexing

selector.*


Depends on the selector.type value

interceptors


Space-separated list of interceptors

interceptors.*



实际例子话,第二节的第一个例子就是Netcat source,这里不演示了。

六、Sequence Generator Source
一个简单的序列发生器,不断产成与事件计数器0和1的增量开始。主要用于测试(官网说),这里也不做赘述。

七、Syslog Sources
读取syslog数据,并生成Flume 事件。 这个Source分成三类SyslogTCP Source、
Multiport Syslog TCP Source(多端口)与SyslogUDP Source。其中TCP Source为每一个用回车(\ n)来分隔的字符串创建一个新的事件。而UDP Source将整个消息作为一个单一的事件。

7.1、Syslog TCPSource
这个是最初的Syslog Sources
下面是官网给出的source的配置,加粗的参数是必选,这里可选我省略了。
Property Name

Default

Description

channels



type


The component type name, needs to be syslogtcp

host


Host name or IP address to bind to

port


Port # to bind to

官网案例
a1.sources=r1

a1.channels=c1

a1.sources.r1.type=syslogtcp

a1.sources.r1.port=5140

a1.sources.r1.host=localhost

a1.sources.r1.channels=c1

下面是实际的例子


  [html] view plain copy

  •   #配置文件:syslog_case5.conf  
  •   # Name the components on this agent  
  •   a1.sources = r1  
  •   a1.sinks = k1  
  •   a1.channels = c1  
  •      
  •   # Describe/configure the source  
  •   a1.sources.r1.type =syslogtcp  
  •   a1.sources.r1.port =50000  
  •   a1.sources.r1.host =192.168.233.128  
  •   a1.sources.r1.channels =c1  
  •      
  •   # Describe the sink  
  •   a1.sinks.k1.type = logger  
  •    a1.sinks.k1.channel = c1  
  •      
  •   # Use a channel which buffers events inmemory  
  •   a1.channels.c1.type = memory  
  •   a1.channels.c1.capacity = 1000  
  •   a1.channels.c1.transactionCapacity = 100  
  




这里我们设置的侦听端口为192.168.233.128 50000
#敲命令
flume-ng agent -cconf -f conf/syslog_case5.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另一个终端输入,往侦听端口送数据
echo "hellolooklook5" | nc 192.168.233.128 50000
然后看之前的终端,将会有如下显示:




数据已经发送过来了。

7.2 Multiport Syslog TCP Source
这是一个更新,更快,支持多端口版本的SyslogTCP Source。他不仅仅监控一个端口,还可以监控多个端口。官网配置基本差不多,就是可选配置比较多
Property Name

Default

Description

channels



type


The component type name, needs to be multiport_syslogtcp

host


Host name or IP address to bind to.

ports


Space-separated list (one or more) of ports to bind to.

portHeader


If specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port.

这里说明下需要注意的是这里ports设置已经取代tcp 的port,这个千万注意。还有portHeader这个可以与后面的interceptors 与 channel selectors自定义逻辑路由使用。
下面是官网例子:
a1.sources=r1

a1.channels=c1

a1.sources.r1.type=multiport_syslogtcp

a1.sources.r1.channels=c1

a1.sources.r1.host=0.0.0.0

a1.sources.r1.ports=10001 10002 10003

a1.sources.r1.portHeader=port

下面是实际例子


  [html] view plain copy

  •   #配置文件:syslog_case6.conf  
  •   # Name thecomponents on this agent  
  •   a1.sources = r1  
  •   a1.sinks = k1  
  •   a1.channels = c1  
  •      
  •   #Describe/configure the source  
  •   a1.sources.r1.type = multiport_syslogtcp  
  •   a1.sources.r1.ports = 50000 60000  
  •   a1.sources.r1.host = 192.168.233.128  
  •   a1.sources.r1.channels = c1  
  •      
  •   # Describe thesink  
  •   a1.sinks.k1.type= logger  
  •    a1.sinks.k1.channel = c1  
  •      
  •   # Use a channelwhich buffers events in memory  
  •   a1.channels.c1.type= memory  
  •   a1.channels.c1.capacity= 1000  
  •   a1.channels.c1.transactionCapacity= 100  
  




这里我们侦探192.168.233.128的2个端口50000与60000
#敲命令
flume-ng agent -cconf -f conf/syslog_case6.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另一个终端输入,往侦听端口送数据
echo "hellolooklook5" | nc 192.168.233.128 50000
echo "hello looklook6"| nc 192.168.233.128 60000
  然后看之前的终端,将会有如下显示:





2个端口的数据已经发送过来了。

7.3 Syslog UDP Source
关于这个官网都懒的介绍了,其实就是与TCP不同的协议而已。
官网配置与TCP一致,就不说了。下面是官网例子
a1.sources=r1

a1.channels=c1

a1.sources.r1.type=syslogudp

a1.sources.r1.port=5140

a1.sources.r1.host=localhost

a1.sources.r1.channels=c1

下面是实际例子


  [html] view plain copy

  •   #配置文件:syslog_case7.conf  
  •   # Name thecomponents on this agent  
  •   a1.sources = r1  
  •   a1.sinks = k1  
  •   a1.channels = c1  
  •      
  •   #Describe/configure the source  
  •   a1.sources.r1.type = syslogudp  
  •   a1.sources.r1.port = 50000  
  •   a1.sources.r1.host = 192.168.233.128  
  •   a1.sources.r1.channels = c1  
  •      
  •   # Describe thesink  
  •   a1.sinks.k1.type= logger  
  •    a1.sinks.k1.channel = c1  
  •      
  •   # Use a channelwhich buffers events in memory  
  •   a1.channels.c1.type= memory  
  •   a1.channels.c1.capacity= 1000  
  •   a1.channels.c1.transactionCapacity= 100  
  



#敲命令
flume-ng agent -cconf -f conf/syslog_case7.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另一个终端输入,往侦听端口送数据
echo "hellolooklook5" | nc –u 192.168.233.128 50000
#在启动的终端查看console输出




Ok,数据已经发送过来了


八、HTTP Source
HTTP Source是HTTP POST和GET来发送事件数据的,官网说GET应只用于实验。Flume 事件使用一个可插拔的“handler”程序来实现转换,它必须实现的HTTPSourceHandler接口。此处理程序需要一个HttpServletRequest和返回一个flume 事件列表。
所有在一个POST请求发送的事件被认为是在一个事务里,一个批量插入flume 通道的行为。
下面是官网给出的source的配置,加粗的参数是必选
Property Name

Default

Description

type


The component type name, needs to be http

port


The port the source should bind to.

bind

0.0.0.0

The hostname or IP address to listen on

handler

org.apache.flume.source.http.JSONHandler

The FQCN of the handler class.

官网例子
a1.sources=r1

a1.channels=c1

a1.sources.r1.type=http

a1.sources.r1.port=5140

a1.sources.r1.channels=c1

a1.sources.r1.handler=org.example.rest.RestHandler

a1.sources.r1.handler.nickname=random props

下面是实际用例:




  [html] view plain copy

  •   #配置文件:http_case8.conf  
  •   #Name the components on this agent  
  •   a1.sources= r1  
  •   a1.sinks= k1  
  •   a1.channels= c1  
  •      
  •   #Describe/configure the source  
  •   a1.sources.r1.type= http  
  •   a1.sources.r1.port= 50000  
  •   a1.sources.r1.channels= c1  
  •      
  •   #Describe the sink  
  •   a1.sinks.k1.type= logger  
  •    a1.sinks.k1.channel = c1  
  •      
  •   #Use a channel which buffers events in memory  
  •   a1.channels.c1.type= memory  
  •   a1.channels.c1.capacity= 1000  
  •   a1.channels.c1.transactionCapacity= 100  
  



#敲命令
flume-ng agent -cconf -f conf/http_case8.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
#我们用生成JSON 格式的POSTrequest发数据
curl -X POST -d '[{"headers" :{"looklook1" : "looklook1 isheader","looklook2" : "looklook2 isheader"},"body" : "hello looklook5"}]' http://192.168.233.128:50000
#在启动的终端查看console输出




这里headersbody都正常输出。

九、Twitter 1%firehose Source(实验的)
官网警告,慎用,说不定下个版本就木有了
这个实验source 是通过时搜索服务,从Twitter1%样本信息中获取事件数据。需要Twitter开发者账号。好吧,对于400网站,我们无可奈何用不到,就不多解释了。



十、自定义Source
一个自定义 Source其实是对Source接口的实现。当我们开始flume代理的时候必须将自定义 Source和相依赖的jar包放到代理的classpath下面。自定义 Sourcetype就是我们实现Source接口对应的类全路径。
这里后面的内容里会详细介绍,这里不做赘述。
  





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669424-1-1.html 上篇帖子: flume的各个组件及整体工作流程 下篇帖子: flume
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表