设为首页 收藏本站
查看: 1171|回复: 0

[经验分享] Flume NG源码分析(一)基于静态properties文件的配置模块

[复制链接]

尚未签到

发表于 2015-11-27 17:48:36 | 显示全部楼层 |阅读模式
  日志收集是互联网公司的一个重要服务,Flume NG是Apache的顶级项目,是分布式日志收集服务的一个开源实现,具有良好的扩展性,与其他很多开源组件可以无缝集成。搜了一圈发现介绍Flume NG的文章有不少,但是深入分析Flume NG源代码的却没有。准备写一个系列分析一下Flume NG的源码。先从基础的配置模块说起。
  


  Flume NG支持两种配置模式,一种是基于properties文件的静态配置,并且只加载一次。另一种是基于Guava EventBus发布订阅模式的动态配置,可运行时加载修改的配置。这篇先说说基于properties文件的静态配置。


  


  下面这个是flume-conf.properties的一个常见配置


  1. producers是agent的名字,一个agent表示一个Flume-NG的进程
  2. producer.sources指定了这个agent监控的几个日志源,可以配置多个source
  3. producer.channels, sinks指定了channel和sink,这些概念后面会说
  4. producer.sources.sX.XXX指定了日志源获取的方式,对于从本地日志文件收集的方式来说,实际使用的是tail -F的命令来监控日志文件的尾部
  

producer.sources = s1 s2 s3
producer.channels = c
producer.sinks = r

producer.sources.s1.type = exec
producer.sources.s1.channels = c
producer.sources.s1.command = tail -F /data/logs/s1.log
producer.sources.s2.type = exec
producer.sources.s2.channels = c
producer.sources.s2.command = tail -F /data/logs/s2.log
producer.sources.s3.type = exec
producer.sources.s3.channels = c
producer.sources.s3.command = tail -F /data/logs/s3.log
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=server1:9092,server2:9092,server3:9092
producer.sinks.r.zk.connect=server1:2181,server2:2181,server3:2181,server4:2181,server5:2181
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=topic.xxx
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = memory
producer.channels.c.capacity = 1000


再看看如何指定的producer这个agent名字以及指定采用哪个配置文件,下面是Flume NG的启动命令,-f指定了配置文件的路径,-n指定了agent的名字,也就是flume-conf.properties里面每项配置的前缀名  
  

/flume-ng  agent -c conf -f ../conf/flume-conf.properties -n producer -Dflume.root.logger=INFO,console > flume-ng.log 2>&1 &

来看看Flume-NG是如何来获取命令行参数,以及如何把flume-conf.properties的配置转化成它内部的数据结构的。  
  
org.apache.flume.node.Application类是Flume NG的启动类,看一下它的main方法
1. 使用了commons-cli.jar提供的解析命令行参数的能力来解析命令行参数,把-n, -f/--conf-file, --no-reload-conf这几个配置信息读到变量
2. 打开由-f参数指定的配置文件,如果指定了no-reload-conf = false,也就是要运行时加载配置,就创建一个EventBus来发布和注册配置文件修改的事件,创建一个
PollingPropertiesFileConfigurationProvider 来轮询properties配置文件是否修改,如果修改就重新加载
3. no-reload-conf默认是true,也就是说默认是静态配置,只在启动时加载一次,只需要创建一个PropertiesFileConfigurationProvider来读取properties配置文件即可


public static void main(String[] args) {
try {
Options options = new Options();
Option option = new Option("n", "name", true, "the name of this agent");
option.setRequired(true);
options.addOption(option);
option = new Option("f", "conf-file", true, "specify a conf file");
option.setRequired(true);
options.addOption(option);
option = new Option(null, "no-reload-conf", false, "do not reload " +
"conf file if changed");
options.addOption(option);
option = new Option("h", "help", false, "display help text");
options.addOption(option);
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);
File configurationFile = new File(commandLine.getOptionValue('f'));
String agentName = commandLine.getOptionValue('n');
boolean reload = !commandLine.hasOption("no-reload-conf");
if (commandLine.hasOption('h')) {
new HelpFormatter().printHelp("flume-ng agent", options, true);
return;
}
/*
* The following is to ensure that by default the agent
* will fail on startup if the file does not exist.
*/
if (!configurationFile.exists()) {
// If command line invocation, then need to fail fast
if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) {
String path = configurationFile.getPath();
try {
path = configurationFile.getCanonicalPath();
} catch (IOException ex) {
logger.error("Failed to read canonical path for file: " + path, ex);
}
throw new ParseException(
"The specified configuration file does not exist: " + path);
}
}
List<LifecycleAware> components = Lists.newArrayList();
Application application;
if(reload) {
EventBus eventBus = new EventBus(agentName + &quot;-event-bus&quot;);
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(agentName,
configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components);
eventBus.register(application);
} else {
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName,
configurationFile);
application = new Application();
application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
application.start();
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread(&quot;agent-shutdown-hook&quot;) {
@Override
public void run() {
appReference.stop();
}
});
} catch (Exception e) {
logger.error(&quot;A fatal error occurred while running. Exception follows.&quot;,
e);
}
}

Flume NG配置相关的接口和类的结构如下  
1. ConfigurationProvider顶层接口定义了 MaterializedConfiguration getConfiguration() 方法
2. MaterializedConfiguration接口表示具体化的配置,也就是把flume-conf.properties配置文件里定义的配置实例化成具体的对象。SimpleMaterializedConfiguration提供了实现,维护了实际运行时的配置数据结构
3. AbstractConfigurationProvider实现了ConfigurationProvider接口,并定义了abstract FlumeConfiguration getFlumeConfiguration()抽象方法
4. FlumeConfiguration, AgentConfiguration, SourceConfiguration, ChannelConfiguration, SinkConfiguration这几个类用来辅助解析flume-conf.properties配置文件,保存配置定义的字段
5. PropertiesFileConfigurationProvider从-f/--conf指定的配置文件中读取配置信息,只在读取一次
6. PollingPropertiesFileConfigurationProvider 采用轮询的方式从配置文件中读取配置信息,并支持动态修改配置
DSC0000.jpg




  
PropertiesFileConfigurationProvider的实现很简单
1. 首先是getFlumeConfiguration方法读取properties文件,然后转化成FlumeConfiguration结构的对象
2. 在 父类AbstractConfigurationProvider的getConfiguration方法生成MaterializedConfiguration实例,也就是创建实际运行时的Channel, SourceRunner, SinkRunner对象,它会从FlumeConfiguration中去读取各个对象的字段


public FlumeConfiguration getFlumeConfiguration() {
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(file));
Properties properties = new Properties();
properties.load(reader);
return new FlumeConfiguration(toMap(properties));
} catch (IOException ex) {
LOGGER.error(&quot;Unable to load file:&quot; + file
+ &quot; (I/O failure) - Exception follows.&quot;, ex);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ex) {
LOGGER.warn(
&quot;Unable to close file reader for file: &quot; + file, ex);
}
}
}
return new FlumeConfiguration(new HashMap<String, String>());
}
public MaterializedConfiguration getConfiguration() {
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
    FlumeConfiguration fconfig = getFlumeConfiguration();
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
    if (agentConf != null) {
      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
      try {
        loadChannels(agentConf, channelComponentMap);
        loadSources(agentConf, channelComponentMap, sourceRunnerMap);
        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
        Set<String> channelNames =
            new HashSet<String>(channelComponentMap.keySet());
        for(String channelName : channelNames) {
          ChannelComponent channelComponent = channelComponentMap.
              get(channelName);
          if(channelComponent.components.isEmpty()) {
            LOGGER.warn(String.format(&quot;Channel %s has no components connected&quot; +
                &quot; and has been removed.&quot;, channelName));
            channelComponentMap.remove(channelName);
            Map<String, Channel> nameChannelMap = channelCache.
                get(channelComponent.channel.getClass());
            if(nameChannelMap != null) {
              nameChannelMap.remove(channelName);
            }
          } else {
            LOGGER.info(String.format(&quot;Channel %s connected to %s&quot;,
                channelName, channelComponent.components.toString()));
            conf.addChannel(channelName, channelComponent.channel);
          }
        }
        for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
          conf.addSourceRunner(entry.getKey(), entry.getValue());
        }
        for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
          conf.addSinkRunner(entry.getKey(), entry.getValue());
        }
      } catch (InstantiationException ex) {
        LOGGER.error(&quot;Failed to instantiate component&quot;, ex);
      } finally {
        channelComponentMap.clear();
        sourceRunnerMap.clear();
        sinkRunnerMap.clear();
      }
    } else {
      LOGGER.warn(&quot;No configuration found for this host:{}&quot;, getAgentName());
    }
    return conf;
  }
</pre><pre name=&quot;code&quot; class=&quot;java&quot;>

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144294-1-1.html 上篇帖子: Flume分布式日志系统(三) 下篇帖子: Flume NG 学习笔记(九)Flune Client 开发
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表