设为首页 收藏本站
查看: 1670|回复: 0

[经验分享] flume 简介安装使用案例(将log4j数据写到hdfs中)

[复制链接]

尚未签到

发表于 2017-5-22 06:10:21 | 显示全部楼层 |阅读模式
 
 
0 flume地址:
0) 官网地址: http://flume.apache.org/
1)官网学习:
http://flume.apache.org/documentation.html  点击 User Guide ---> 进入如下链接http://flume.apache.org/FlumeUserGuide.html
2) 下载地址:
http://flume.apache.org/download.html

 
 
1 flume简介:
 
cloudera公司开发的实时日志收集系统,原名是flume og (original generation),
但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation,改动的另一原因是将 Flume 纳入 apache 旗下,cloudera Flume 改名为 Apache Flume。
这就是flume og 和  flume ng别名的取名和两者区别
 
简要提下 Flume NG (1.x.x)的主要变化:


  • sources和sinks 使用channels 进行链接
  • 两个主要channel 。1,  in-memory channel  非持久性支持,速度快。 缺点:any events still left in the memory channel when an agent process dies can’t be recovered
  • 2 , JDBC-based channel 持久性支持。
  • 不再区分逻辑和物理node,所有物理节点统称为 “agents”,每个agents 都能运行0个或多个sources 和sinks
  • 不再需要master节点和对zookeeper的依赖,以及去掉了collector,配置文件简单化。
  • 插件化,一部分面对用户,工具或系统开发人员。
  • 使用Thrift、Avro Flume sources 可以从flume0.9.4 发送 events  到flume 1.x

 
 
如下是flume og架构图:
 

DSC0000.png
 

具体 flume og到 flume ng到底转变了些什么,见如下链接(ibm developerworks)
http://www.ibm.com/developerworks/cn/data/library/bd-1404flumerevolution/index.html 写的不错
 
 
flume作用:
a) flume是一个分布式的数据收集系统,具有高可靠、高可用、事务管理、失败重启等功能。数据处理速度快,完全可以用于生产环境。
b) flume是分布式的日志收集系统(这里的日志是一个泛泛统称,可以是日志 可以是命令行输出 可以是数据文件),把收集来的数据传送到目的地去。

 
2 flume架构和组成 :
 
a) client: 生产数据的地方
b) event: 生产的数据
c) agent: flume核心组件,接收生产的数据,暂时存储并在发送数据到目的地后删除存储数据的处理单位,
               一个agent就是一个jvm, agent又是由 source, channel,sink,Interceptor等构建而成。
 
对上述名词具体解释如下:
Client:生产数据,运行在一个独立的线程。
Events:可以是日志记录、 avro 对象等,如果是文本文件通常是一行记录,这也是事务的基本单位
Agent:使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。
            
Source:从Client收集数据,传递给Channel。不同的 source,可以接受不同的数据格式,

              比如监视外部源--目录池(spooling directory)数据源,可以监控指定文件夹中的新文件变化,
              如果目录中有文件产生,就会立刻读取其内容。
             source组件可以处理各种格式的日志数据,eg:avro Sources、thrift Sources、exec、jms、spooling                 directory、netcat、sequence generator、syslog、http、legacy、自定义。
            支持的这些格式都可以通过http://flume.apache.org/documentation.html  点击 User Guide ---> http://flume.apache.org/FlumeUserGuide.html查询到


Channel:连接 sources 和 sinks ,这个有点像一个队列,是一个存储池,接收source的输出,直到有sink

                 消费掉channel中的数据,channel中的数据直到进入下一个channel或者进入sink才会被删除,
                 当sink写入失败后,可以自动重启,不会造成数据丢失。
                 临时存放的数据可以存放在memory Channel、jdbc Channel、file Channel、自定义。
Sink:从Channel收集数据,运行在一个独立线程。用于把数据发送到目的地的组件
          目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。
          
在整个数据传输过程中,流动的是event。事务保证是在event级别
flume可以支持多级flume的agent,(即多个flume可以连成串,上一个flume可以把数据写到下一个flume上)
  支持扇入(fan-in)、扇出(fan-out)
  扇入(fan-in): source可以接受多个输入,
  扇出(fan-out): sink可以输出到多个目的地

 
 
flume ng节点组成图:

DSC0001.png
 

 
多Agent并联下的架构图(flume的web架构):

DSC0002.png
 

3 flume特性:
 
a) 可靠性
当节点出现故障时,日志能够被传送到其他节点上而不会丢失。
Flume提供了三种级别的可靠性保障,从强到弱依次分别为:
end-to- end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。)
Store on failure(这也是scribe采用的策略,当数据接收方crash崩溃时,将数据写到本地,待恢复后,继续发送)
Best effort(数据发送到接收方后,不会进行确认)
 
b) 可扩展性
 
物理可扩展性:
Flume采用了三层架构,分别为agent,collector(clint端吧)和storage(sink数据流向方吧),每一层均可以水平扩展。其中,所有agent和 collector由master统一管理,这使得系统容易监控和维护,且master允许有多个(使用ZooKeeper进行管理和负载均衡),这就避 免了单点故障问题。(这是针对flume og在结合zk情况下)
 
 
逻辑可扩展性:
 用户可以根据需要添加自己的agent,collector或者storage。此外,Flume自带了很多组件,包括各种agent(file,syslog等),collector和storage(file,HDFS等),就是flume可以采集多种类型的数据,数据最后可以流向保存到多种类型的目的中。
 
 
c) 可管理性   这是针对flume og在结合zk情况下
 所有agent和Collector由master统一管理,这使得系统便于维护。多master情况,Flume利用 ZooKeeper和gossip,保证动态配置数据的一致性。用户可以在master上查看各个数据源或者数据流执行情况,且可以对各个数据源配置和动 态加载。Flume提供了web 和shell script command两种形式对数据流进行管理。
 
 
 
4 flume安装:
 

解压:
[iyunv@h2sliver112 conf]# tar -zxvf apache-flume-1.5.2-bin.tar.gz
重命名:
[iyunv@h2sliver112 local]# mv apache-flume-1.5.2-bin flume1.5.2-bin
增加配置文件
[iyunv@h2sliver112 flume1.5.2-bin]# cd conf
[iyunv@h2sliver112 conf]# cp flume-env.sh.template flume-env.sh
[iyunv@h2sliver112 conf]# cp flume-conf.properties.template flume-conf.properties
[iyunv@h2sliver112 conf]# vi flume-env.sh 修改JAVA_HOME=/opt/jdk1.7
[iyunv@h2sliver112 conf]# 验证 flume-ng  version
Flume 1.5.2
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 229442aa6835ee0faa17e3034bcab42754c460f5
Compiled by hshreedharan on Wed Nov 12 12:51:22 PST 2014
From source with checksum 837f81bd1e304a65fcaf8e5f692b3f18
 
 
 
5 flume简单案例: 使用avro方式监听一个文件 并将文件通过log4j样式输出到console
 

0 目的:写一个简单案例(通过avro方式,从客户端上读取一个文件,然后提交到avro服务端的source获取,通过内存channel最后将数据输送到目的地logger 并在控制台输出)
testflume----> agent-source-avro----> agent-channel-memory--->agent-sink-flume-logger--->-Dflume.root.logger=DEBUG,console控制台打印 
1 对应agent配置文件写法为:
这里我将文件写在目录:
# pwd
/usr/local/flume1.5.2-bin/conf内
[iyunv@h2sliver112 conf]# vi agent1.conf
内容如下:
agent1.sources=source1
agent1.channels=channel1
agent1.sinks=sink1

agent1.sources.source1.type=avro   表示读取数据源格式为avro
agent1.sources.source1.bind=0.0.0.0
agent1.sources.source1.port=41414   端口表示这个client输送到这个agent要走的socket端口, 定义的agent不同端口下表示接受不同client发送来的数据。
agent1.sources.source1.channels=channel1
agent1.channels.channel1.type=memory
agent1.sinks.sink1.type=logger  表示agent导出数据打印到flume日志文件中
agent1.sinks.sink1.channel=channel1

2 根据上面配置好的agent.conf配置信息,启动flume agent。
[iyunv@h2sliver112 bin]# flume-ng agent  --conf ../conf/ -f ../conf/agent1.conf -n agent1 -Dflume.root.logger=DEBUG,console
--conf ../conf/ 指定flume配置文件目录位置
-f指定flume服务端启动监听的agent配置文件 
-n指定启动的flume agent别名
-Dflume.root.logger=DEBUG,console 表示数据在控制台打印出
启动flume agent后,可以看到如下信息:
 Avro source source1 started,此时会不停扫描agent1.conf
上面配置sink为flume logflume logger,配置内容为:
more conf/log4j.properties:
#for test can use  -Dflume.root.logger=DEBUG,console when launching flume 注释表示如果是测试那么可以用这种方式
flume.root.logger=INFO,LOGFILE 默认是将数据写入到当前目录下logs文件夹内的文件名为flume.log的文件中
flume.log.dir=./logs
flume.log.file=flume.log

3 本地文件如下:
[iyunv@h2sliver112 local]# cat testflume
1
2
3
4
5
3 启动avro client,读取本地文件,在看上面启动的服务端是否有数据输出:
启动avro client命令如下:
[iyunv@h2sliver112 bin]# flume-ng avro-client --conf ../conf/ -H localhost -p 41414 -F /usr/local/testflume
-H localhost 指定运行机器
-p 41414 指定端口
-F /usr/local/testflume 指定外部数据源文件
上述命令启动后,flume avro client将文件testflume数据输送到上面定义的 flume agent source中,
avro-client           run an avro Flume client
avro-client
options:
  --host,-H <host>       hostname to which events will be sent
  --filename,
-F <file>   text file to stream to avro source
可以看到监听到的服务端打印结果如下:见截图,可见将文件数据监听的每个Event都打印出来了。


 
DSC0003.png

 
 
上述案例中,如果启动 flume agent为:
flume-ng agent  --conf ../conf/ -f ../conf/agent1.conf -n agent1 ,那么最后产生数据会在:
[iyunv@hadoop3 logs]# pwd
/opt/flume1.5.2/bin/logs
[iyunv@hadoop3 logs]# ll
total 12
-rw-r--r-- 1 root root 11215 Feb 21 00:56 flume.log

 
如果启动flume agent想启动后在后台执行,那么写成:
flume-ng agent  --conf ../conf/ -f ../conf/agent1.conf -n agent1&  加上&即可。

5 flume使用案例2,使用 avro将log4j不断产生的日志数据写到hdfs中:

 
 
需要准备的:
a) hadoop2环境,并事先创建好hdfs目录: /flume/events
b) 包flume-ng-log4jappender-1.5.0-cdh5.1.3-jar-with-dependencies.jar 用于将log4j的信息和flume关联,附件可下载此工程和对应包
 
开工:
 
0  如果使用maven 需要下载 flume slf4j的依赖:

<!-- flume -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.5.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.5.2</version>
</dependency>
 
1 Java端:

1 java
不断产生日志,模拟web系统不停运行产生日志效果
public class GenerateLog4j {
/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
while(true){
Logger logger = org.apache.log4j.Logger.getLogger(GenerateLog4j.class);
logger.error("日期时间" + System.currentTimeMillis());
Thread.sleep(1000);
}
}

2 Java端的log4j.properties
log4j.rootLogger=INFO,flume
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.1.112 // 对应flume 安装的位置机器地址
log4j.appender.flume.Port = 44444  // 对应flume启动的监听端口 这个端口在flume 定义的conf中会写好
log4j.appender.flume.UnsafeMode = true

3 flume:
flume conf/创建 agent2.conf 内容如下
agent2.sources=source1
agent2.channels=channel1
agent2.sinks=sink1
agent2.sources.source1.type=avro
agent2.sources.source1.bind=0.0.0.0
agent2.sources.source1.port=44444    // 指定监听端口
agent2.sources.source1.channels=channel1
agent2.sources.source1.interceptors = i1 i2   指定使用flume的两个拦截器,一个是时间的 一个是IP的
agent2.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent2.sources.source1.interceptors.i1.preserveExisting = true
agent2.sources.source1.interceptors.i1.useIP = true
agent2.sources.source1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
agent2.channels.channel1.type=memory
agent2.channels.channel1.capacity=10000
agent2.channels.channel1.transactionCapacity=1000
agent2.channels.channel1.keep-alive=30
agent2.sinks.sink1.type=hdfs
agent2.sinks.sink1.channel=channel1
agent2.sinks.sink1.hdfs.path=hdfs://h2single:9000/flume/events/%{host}/%y-%m-%d   指定写到目标hadoop2集群的hdfs某个目录下 可以用hive创建分区表加载并继续做MR操作
agent2.sinks.sink1.hdfs.fileType=DataStream
agent2.sinks.sink1.hdfs.writeFormat=Text
agent2.sinks.sink1.hdfs.rollInterval=0
agent2.sinks.sink1.hdfs.rollSize=10000
agent2.sinks.sink1.hdfs.rollCount=0
agent2.sinks.sink1.hdfs.idleTimeout=5

4 flume目录下启动如下命令:
bin/flume-ng agent --conf ./conf/  -Dflume.monitoring.type=http -Dflume.monitoring.port=34343 -n agent2 -f conf/agent2.conf
// flume应用参数监控 -Dflume.monitoring.port=34343 可以通过http://ip:34343/metrics访问,从而看到监控信息
5 运行Java端GenerateLog4j.java
6 查看h2single hdfs被写入的数据  数据截图如下:

 

DSC0004.png
 

 
 
 

6 flume和其他组件的区别:
 
6.1) flume和sqoop的区别:
sqoop 定向从关系库导入数据到hadoop生态(hdfs/hive/hbase)
flume 从不同日志类型系统中实时采集数据到hadoop生态       两者使用定位不一样

 
6.2) flume和kafka的区别:
kafka强调的是吞吐量。数据来源单一,
flume强调的是多种适配器。source sink有很多种。
 
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379675-1-1.html 上篇帖子: Flume HDFS Sink使用及源码分析 下篇帖子: flume之退避算法backoff algorithm
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表