设为首页 收藏本站
查看: 1144|回复: 0

[经验分享] flume源码分析1

[复制链接]

尚未签到

发表于 2019-1-30 09:54:17 | 显示全部楼层 |阅读模式
  1.启动命令

nohup bin/flume-ng agent -n agent-server  -f  agent-server1.conf &  flume-ng是一个shell脚本:

  agent                 run a Flume agent  ---> org.apache.flume.node.Application 类
  avro-client           run an avro Flume client ---> org.apache.flume.client.avro.AvroCLIClient 类run_flume() { #shell脚本实现
  local FLUME_APPLICATION_CLASS
  if [ "$#" -gt 0 ]; then
    FLUME_APPLICATION_CLASS=$1
    shift
  else
    error "Must specify flume application class" 1
  fi
  if [ ${CLEAN_FLAG} -ne 0 ]; then
    set -x
  fi
  $EXEC $JAVA_HOME/bin/java $JAVA_OPTS -cp "$FLUME_CLASSPATH" \
      -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
}
......
# finally, invoke the appropriate command
if [ -n "$opt_agent" ] ; then  #如果第一个参数为agent时,opt_agent取值为1
  run_flume $FLUME_AGENT_CLASS $args #FLUME_AGENT_CLASS="org.apache.flume.node.Application"
elif [ -n "$opt_avro_client" ] ; then
  run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n "${opt_version}" ] ; then
  run_flume $FLUME_VERSION_CLASS $args
elif [ -n "${opt_tool}" ] ; then
  run_flume $FLUME_TOOLS_CLASS $args
else
  error "This message should never appear" 1
fi  最终启动的时候调用org.apache.flume.node.Application类的main方法
2.org.apache.flume.node.Application类
1)调用main方法,首先会解析参数,主要是n和f以及no-reload-conf,n为节点名称,f为配置文件,no-reload-conf代表是否支持自动reload(1.5.0才有的功能)
n/f 都有设置的值,no-reload-conf没有设置的项,如果设置了no-reload-conf代表不能自动reload

      Options options = new Options();
      Option option = new Option("n" , "name" , true, "the name of this agent");
      option.setRequired( true);
      options.addOption(option);
      option = new Option("f" , "conf-file" , true, "specify a conf file");
      option.setRequired( true);
      options.addOption(option);
      option = new Option(null , "no-reload-conf" , false, "do not reload " +
        "conf file if changed");
      options.addOption(option);
....
      CommandLineParser parser = new GnuParser();
      CommandLine commandLine = parser.parse(options, args);
      File configurationFile = new File(commandLine.getOptionValue('f' ));
      String agentName = commandLine.getOptionValue( 'n');
      boolean reload = !commandLine.hasOption( "no-reload-conf");  //获取是否含有no-reload-conf的设置,如果没有设置no-reload-conf则reload为true  2)   

List components = Lists.newArrayList(); //初始化一个List对象,用来存放需要启动的组件,这个只有在支持reload的情况才会使用
      Application application;
      if(reload) {
        EventBus eventBus = new EventBus(agentName + "-event-bus" );
        PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(agentName,
                configurationFile, eventBus, 30);
        components.add(configurationProvider);
        application = new Application(components);
        eventBus.register(application);
      } else { //不知道reload的情况
        PropertiesFileConfigurationProvider configurationProvider =
            new PropertiesFileConfigurationProvider(agentName,
                configurationFile); //实例化一个PropertiesFileConfigurationProvider 对象,参数是agent的名称和配置文件(即n和f的设置)
        application = new Application(); //实例化一个Application对象
        application.handleConfigurationEvent(configurationProvider.getConfiguration()); //调用handleConfigurationEvent方法
      }
      application.start(); // 调用start方法  不支持reload的启动方法调用:
main--->handleConfigurationEvent-->stopAllComponents+startAllComponents-->start  3)handleConfigurationEvent方法调用stopAllComponents和startAllComponents方法

  public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    stopAllComponents(); //用于
    startAllComponents(conf);
  }  这里handleConfigurationEvent方法的参数为MaterializedConfiguration对象(这里为SimpleMaterializedConfiguration实例)
MaterializedConfiguration对象由AbstractConfigurationProvider.getConfiguration方法返回,在AbstractConfigurationProvider.getConfiguration方法中通过
调用loadChannels/loadSources/loadSinks方法来解析flume的配置文件,同时把对应的Channel,SourceRunner,SinkRunner放到对应的hashmap中,并最终通过SimpleMaterializedConfiguration的addChannel/addSourceRunner/addSinkRunner加载到SimpleMaterializedConfiguration对象中,然后供stopAllComponents/startAllComponents使用
stopAllComponents方法用于关闭所有的组件,
其通过调用MaterializedConfiguration对象(这里具体实现类为SimpleMaterializedConfiguration)的getSourceRunners和getChannels来获取需要关闭的SourceRunner和Channel组件对象,然后对各个组件对象调用LifecycleSupervisor.unsupervise来关闭组件,而startAllComponents正好相反,其对各个组件对象调用LifecycleSupervisor.supervise方法用于启动各个组件服务,另外
startAllComponents方法会调用this.loadMonitoring()方法启动监控flume的metrics的服务(而支持reload的方式不会调用这个方法)
4)start方法会对每一个组件调用LifecycleSupervisor.supervise方法,来进行服务的状态管理(在服务异常时可以自动拉起),这个主要是对支持reload的设置有用,
用来启动检测文件更新的计划任务线程池

  public synchronized void start() {
    for(LifecycleAware component : components) {
      supervisor.supervise(component,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    }  }
supervise的实现参见(http://caiguangguang.blog.运维网.com/1652935/1619527)
支持reload的启动方法调用:main--->EventBus.register-->start方法
reload的实现参见(http://caiguangguang.blog.运维网.com/1652935/1619523)





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669462-1-1.html 上篇帖子: 第86课:SparkStreaming数据源Flume实际案例分享 下篇帖子: flume 日志搬家下半场
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表