初来乍到,第一篇博文,错误支持多多包含多多指点
以下内容以Flume 1.5.2版本为基础(反正1.x都差不多)
看完此文,就相当于看完了官方文档。
数据流模型
event:Flume数据流的基本数据单元,包含header(一组string属性)和body(二进制数组)两部分。
agent:一个Flumeagent是一个JVM进程,有source、channel、sink三大组件构成。
source:接收agent外部发送来的events,并存储到一个channel或多个channels。
channel:events的临时存储管道。
sink:将events发送到数据存储目的地或下一个agent的source。
复杂的流组合
多个agents可以连接在一起,形成复杂的数据流,支持fan-in和fan-out。
Reliability
是指events保存在channel中,只有被sink消费之后,才会从channel中删除。
Recoverability
是指Flume支持类似file的持久化channel。
备注:
官方文档对Flume可靠性和可恢复性的解释,只是表示针对channel中的数据而言。
安装Flume插件
Flume拥有一个完整的基于插件的架构设计。
(1)将自定义组件的jars添加到flume-env.sh文件的FLUME_CLASSPATH变量中。
(2)插件部署在$FLUME_HOME/plugins.d目录下,每一个插件在plugins.d目录中创建一个插件子目录,每一个插件子目录可以拥有以下三个子目录:
 lib – 插件jar(s)
 libext – 插件依赖jar(s)
 native - any required native libraries,such as .so files
设置多agent流
在多个agent之间传输数据,将前一个agent的sink连接到下一个agent的source。
数据合并(fan-in)
多路传输(fan-out)
This fan out can be replicating(default) ormultiplexing。
replicating:每一个event都被发送到source对应的所有channels。
multiplexing:根据每一个event的header,将其发送到预先配置的一个channel或多个channels。
a1.sources=r1
a1.channels=c1 c2 c3 c4
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=state
a1.sources.r1.selector.mapping.CZ=c1
a1.sources.r1.selector.mapping.US=c2 c3
a1.sources.r1.selector.default=c4
Flume sources
- Avro Source
- Thrift Source
- Exec Source
执行cat[namedpipe]ortail-F[file]之类的命令,将命令输出到stdout的数据发送到channel。
和其他异步发送一样,如果不能把数据发送到channel,client(应用程序)无法获得通知,数据丢失。
如果exec执行的commandexists,该source也会exists,不再生产数据到channel。
不可靠。
- JMS Source
从JMS消息队列中获取数据。
- Spooling Directory Source
该source监控指定目录,如果有新的文件出现,就会把新文件解析为events。
events解析逻辑pluggable。
当一个文件全部读取到channel中,该文件会被重命名来标记已经读取完成或被删除。
该source是可靠的,不会丢失数据,即使Flume进程restarted或killed。
可靠地代价是:放到spoolingdirectory中的文件的文件名唯一(不能再被使用)且文件不能被修改。
Despitethe reliability guarantees of this source, there are still casesinwhich events may be duplicated if certain downstream failures occur.This is consistent with the guarantees offered by other Flumecomponents.
- Twitter 1% firehose Source (highly experimental)
- NetCat Source
Anetcat-like source,监听指定的端口,把每一行文本转换为一个event。
- Sequence Generator Source
Usefulmainly for testing.
Reads syslog data and generate Flume events.
TheUDP source treats an entire message asa single event.
TheTCP sources create a new event foreach string of characters separated by a newline (‘n’).
原始的、可靠地syslogTCP source
- Multiport Syslog TCP Source
更新的、更快的、能够配置多端口的syslogTCP source。
使用ApacheMinalibrary实现高效地同时监听多个端口。
Providessupport for RFC-3164 and many common RFC-5424 formatted messages。
Providesthe capability to configure the character set used on a per-portbasis。
Asource which accepts Flume Events by HTTPPOSTand GET.GETshould be used for experimentation only.HTTP requests are converted into flume events by a pluggable“handler”which must implement the HTTPSourceHandler interface.This handler takes a HttpServletRequest and returns a list of flumeevents. All events handled from one Http request are committed to thechannel in one transaction, thus allowing for increased efficiencyonchannels like the file channel. If the handler throws an exception,this source will return a HTTP status of400.If the channel is full, or the source is unable to append events tothe channel, the sourcewill return a HTTP 503- Temporarily unavailable status.
默认handler,处理JSON格式events。
支持UTF-8(默认)、UTF-16、UTF-32字符集。
接收一个events数组(即使只有一个event),然后把他们转化成一个Flumeevent。
处理上传BinaryLargeObject (BLOB)的请求,例如PDF、jpg。
- Legacy Sources(遗留的sources)
允许1.x版本的Flumeagent从0.9.4版本的Flumeagent接收events,将0.9.4版本的events格式转化成1.x版本的格式。
- AvroLegacy Source
- ThriftLegacy Source
- Custom Source
自定义实现source,source的type属性必须是自定义实现的全限定类名(FQCN)。
从Scribe(facebook开源的日志收集系统)获取数据,source的type属性org.apache.flume.source.scribe.ScribeSource。
Flume Sinks
目前支持创建textandsequence files,支持对这两种格式进行压缩。
CurrentlyFlume supports HDFS 0.20.2 and 0.23.
Typicallyuseful for testing/debugging purpose。
主要应用场景
主要应用场景
将channel中的数据转发到配置的IRCdestinations。
保存events到本地文件系统。
丢弃从channel接收的所有events。
- HBaseSinks
- MorphlineSolrSink
Thissink extracts data from Flume events, transforms it, and loads it innear-real-timeinto Apache Solr servers。
Thissink writes data to an elasticsearch cluster。
- Kite Dataset Sink (experimental)
- Custom Sink
自定义实现sink,sink的type属性必须是自定义实现的全限定类名(FQCN)。
Flume Channels
高吞吐量,agentfailure时会丢失channel中的数据。
目前支持内嵌的Derby。Thisisa durable channel that’s ideal for flows where recoverability isimportant。
Channel数据支持化到本地磁盘。
支持数据加密。
Channel中的数据首先保存在内存,分配的内存空间满了就保存在本地磁盘。
高吞吐量,agentfailure时会丢失channel中的数据。
Thischannel is currently experimentaland not recommended for use inproduction.
- Pseudo Transaction Channel
用于单元测试,不可用于生产环境。
自定义实现channel,channel的type属性必须是自定义实现的全限定类名(FQCN)。
Flume Channel Selectors
- Replicating Channel Selector (default)
复制events到与source连接的所有channel。
a1.sources=r1
a1.channels=c1 c2 c3
a1.source.r1.selector.type=replicating
a1.source.r1.channels=c1 c2 c3
a1.source.r1.selector.optional=c3
以上配置,c3是optionalchannel。数据写入c3失败会被忽略。数据写入c1和c2失败会导致整个transaction失败。
- Multiplexing Channel Selector
根据配置的header属性决定source中的events发送到哪一个channel或者哪些channels。
自定义实现selector,selector的type属性必须是自定义实现的全限定类名(FQCN)。
Flume Sink Processors
Flume支持对sinks进行分组,将多个sinks合并到一个sinkgroup。
Sinkprocessors的功能是:对一个sinkgroup内的所有sinks实现(1)loadbalance(2)failover
只接受一个sink,所以没有用,没有必要使用。
对processor维护的每一个sink分配一个优先级,优先级不能重复,必须唯一。只要sinkgroup内有一个sink可用,events就会被发送出去。
Thefailover mechanism works byrelegatingfailed sinks to a pool where they are assigned a cool down period,increasing with sequentialfailures before they are retried. Once asink successfully sends an event, it is restored to the live pool.
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1k2
a1.sinkgroups.g1.processor.type=failover
a1.sinkgroups.g1.processor.priority.k1=5
a1.sinkgroups.g1.processor.priority.k2=10
a1.sinkgroups.g1.processor.maxpenalty=10000
- Load balancing Sink Processor
该processor的selector属性可以选择配置(1)round_robin(2)random(3)集成AbstractSinkSelector自定义实现的FQCN。
通过配置processor的backoff属性,决定是否拉黑fail的sink。
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1k2
a1.sinkgroups.g1.processor.type=load_balance
a1.sinkgroups.g1.processor.backoff=true
a1.sinkgroups.g1.processor.selector=random
- 目前不支持CustomSink Processor。
Event Serializers
file_roll和hdfs这两种sink支持EventSerializer接口。
不做任何转换和修改将event的body写到输出流,header被忽略。
该serializer只有一个属性appendNewline,表示是否添加换行,默认true。
将events序列化到Avrocontainerfile,支持Hadoop的数据压缩格式。
Flume Interceptors
Flume可以通过interceptors对flow中的events进行修改甚至丢弃。
在event的header中添加timestamp属性。
如果header中已经存在timestamp属性,可以通过配置选择保留或替换。
在event的header中添加host属性(该属性名可配置),属性值是运行agent的hostname或IP。
如果header中已经存在host属性,可以通过配置选择保留或替换。
在event的header中添加固定的key-valuepair。
在event的header中添加指定属性名,值是UUID。
处理模型如下图
- RegexFiltering Interceptor
将event的body转换成text,然后使用配置的正则表达式对text进行匹配。
可以选择对匹配正则表达式的event进行include或exclude操作。
- RegexExtractor Interceptor
提取正则表达式匹配的regexmatchgroups,并添加到event的header中。
If the Flume event body contained1:2:3.4foobar5and the following configuration was used.
a1.sources.r1.interceptors.i1.regex=(\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers=s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name=one
a1.sources.r1.interceptors.i1.serializers.s2.name=two
a1.sources.r1.interceptors.i1.serializers.s3.name=three
The extracted event will contain the same bodybut the following headers will have been added one=>1, two=>2,three=>3
Flume Properties
flume.called.from.service属性:如果指定了该属性,Flumeagent会在找不到配置文件时持续检测配置文件,否则,Flumeagent会终止。
Flume定期(30s)检测指定配置文件的变化。
Flumeagent在以下两种情况会加载新的配置:
- 第一次检测到配置文件
- 上一次检测后,配置文件的内容被修改
rename和move配置文件不会改变文件的modificationtime,不会被从新加载。
Log4J Appender
AppendsLog4j events to a flume agent’s avro source.
Load Balancing Log4J Appender
AppendsLog4j events to a list of flume agent’s avro source.
|