设为首页 收藏本站
查看: 2211|回复: 0

[经验分享] Flume概述和简单实例

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-9-16 13:26:55 | 显示全部楼层 |阅读模式
Flume概述
  Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。
DSC0000.jpg
  Flume主要由3个重要的组件购成:


  • Source:完成对日志数据的收集,分成transtion 和 event 打入到channel之中。  

  • Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。  

  • Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。
  对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。
  对于直接读取文件Source,有两种方式:


  • ExecSource:以运行Linux命令的方式,持续的输出最新的数据,如tail -F 文件名指令,在这种方式下,取的文件名必须是指定的。  

  • SpoolSource:是监测配置的目录下新增的文件,并将文件中的数据读取出来。  需要注意两点:


  •   拷贝到spool目录下的文件不可以再打开编辑。

  •   spool目录下不可包含相应的子目录。

  在实际使用的过程中,可以结合log4j使用,使用log4j的时候,将log4j的文件分割机制设为1分钟一次,将文件拷贝到spool的监控目录。log4j有一个TimeRolling的插件,可以把log4j分割的文件到spool目录。基本实现了实时的监控。
  Flume在传完文件之后,将会修改文件的后缀,变为.COMPLETED(后缀也可以在配置文件中灵活指定)
  ExecSource,SpoolSource对比:
  ExecSource可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法何证日志数据的完整性。SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近于实时。如果应用无法实现以分钟切割日志文件的话,可以两种收集方式结合使用。
  Channel有多种方式:
  有MemoryChannel,JDBC Channel,MemoryRecoverChannel,FileChannel。MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。FileChannel保证数据的完整性与一致性。在具体配置不现的FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。
  Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。

flume安装配置
  flume安装配置比较简单,下载flume1.5.0二进制包 http://www.apache.org/dyn/closer.cgi/flume/1.5.0/apache-flume-1.5.0-bin.tar.gz
  解压即可 tar -zvxf apache-flume-1.5.0-bin.tar.gz

简单实例
  进入flume目录,新建example.conf
  

# example.conf: A single-node Flume configuration  
# Name the components on this agent
  
a1.sources = r1
  
a1.sinks = k1
  
a1.channels = c1
  
# Describe/configure the source
  
a1.sources.r1.type = exec
  
a1.sources.r1.command = echo 'hello'
  
# Describe the sink
  
a1.sinks.k1.type = logger
  
# Use a channel which buffers events in memory
  
a1.channels.c1.type = memory
  
a1.channels.c1.capacity = 1000
  
a1.channels.c1.transactionCapacity = 100
  
# Bind the source and sink to the channel
  
a1.sources.r1.channels = c1
  
a1.sinks.k1.channel = c1
  

  启动flume: bin/flume-ng agent --f example.conf --name a1 -Dflume.root.logger=INFO,console
  输出日志:
  

14/06/19 18:16:29 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting  
14/06/19 18:16:29 INFO node.PollingPropertiesFileConfigurationProvider:>  
14/06/19 18:16:29 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
  
14/06/19 18:16:29 INFO conf.FlumeConfiguration: Processing:k1
  
14/06/19 18:16:29 INFO conf.FlumeConfiguration: Processing:k1
  
14/06/19 18:16:29 WARN conf.FlumeConfiguration: Invalid property specified: conf
  
14/06/19 18:16:29 WARN conf.FlumeConfiguration: Configuration property ignored: mple.conf = A single-node Flume configuration
  
14/06/19 18:16:29 WARN conf.FlumeConfiguration: Agent configuration for 'mple' does not contain any channels. Marking it as invalid.
  
14/06/19 18:16:29 WARN conf.FlumeConfiguration: Agent configuration invalid for agent 'mple'. It will be removed.
  
14/06/19 18:16:29 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
  
14/06/19 18:16:29 INFO node.AbstractConfigurationProvider: Creating channels
  
14/06/19 18:16:29 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
  
14/06/19 18:16:29 INFO node.AbstractConfigurationProvider: Created channel c1
  
14/06/19 18:16:29 INFO source.DefaultSourceFactory: Creating instance of source r1, type exec
  
14/06/19 18:16:29 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger
  
14/06/19 18:16:29 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
  
14/06/19 18:16:29 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@1730d54 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
  
14/06/19 18:16:29 INFO node.Application: Starting Channel c1
  
14/06/19 18:16:29 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
  
14/06/19 18:16:29 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
  
14/06/19 18:16:29 INFO node.Application: Starting Sink k1
  
14/06/19 18:16:29 INFO node.Application: Starting Source r1
  
14/06/19 18:16:29 INFO source.ExecSource: Exec source starting with command:echo 'hello'
  
14/06/19 18:16:29 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
  
14/06/19 18:16:29 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
  
14/06/19 18:16:29 INFO source.ExecSource: Command [echo 'hello'] exited with 0
  
14/06/19 18:16:29 INFO sink.LoggerSink: Event: { headers:{} body: 27 68 65 6C 6C 6F 27                            'hello' }
  

  参考文档

Flume(ng) 自定义sink实现和属性注入
  问题导读:
  1.如何实现flume端自定一个sink,来按照我们的规则来保存日志?
  2.想从flume的配置文件中获取rootPath的值,该如何配置?
  最近需要利用flume来做收集远端日志,所以学习一些flume最基本的用法。这里仅作记录。
  远端日志收集的整体思路是远端自定义实现log4j的appender把消息发送到flume端,flume端自定义实现一个sink来按照我们的规则保存日志。
  自定义Sink代码:
  

public>private static final Logger logger = LoggerFactory
  . getLogger(LocalFileLogSink .class );
  private static final String PROP_KEY_ROOTPATH = "rootPath";
  private String rootPath;
  @Override
  public void configure(Context context) {
  String rootPath = context.getString(PROP_KEY_ROOTPATH );
  setRootPath(rootPath);
  }
  
  @Override
  public Status process() throws EventDeliveryException {
  logger .debug("Do process" );
  }
  
}
  

  实现Configurable接口,即可在初始化时,通过configure方法从context中获取配置的参数的值。这里,我们是想从flume的配置文件中获取rootPath的值,也就是日志保存的根路径。在flume-conf.properties中配置如下:
  agent.sinks = loggerSink
  agent.sinks.loggerSink.rootPath = ./logs
  复制代码
  loggerSink是自定义sink的名称,我们取值时的key,只需要loggerSink后面的部分即可,即这里的rootPath。
  实际业务逻辑的执行,是通过继承复写AbstractSink中的process方法实现。从基类的getChannel方法中获取信道,从中取出Event处理即可。
  

Channel ch = getChannel();  Transaction txn = ch.getTransaction();
  txn.begin();
  try {
  logger .debug("Get event." );
  Event event = ch.take();
  txn.commit();
  status = Status. READY ;
  return status;
  }finally {
  Log. info( "trx close.");
  txn.close();
  }

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-114527-1-1.html 上篇帖子: Flume-NG内置计数器(监控)源码级分析 下篇帖子: flume监控
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表