不信网恋 发表于 2015-9-17 07:57:58

log4j到flume的过程(LoadBalancingLog4jAppender)

  没有运行,仅仅凭看代码推测的运行过程如下。
  
  log4j配置
  
  #log4j.logger.com.loadbalance= DEBUG,loadbalance
#log4j.additivity.com.loadbalance= true
log4j.appender.loadbalance = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.loadbalance.Hosts =machine-2:51515 hadoop-master:51515
#log4j.appender.loadbalance.UnsafeMode = true
log4j.appender.out2.MaxBackoff = 30000
#FQDN RANDOM ,default is ROUND_ROBIN
log4j.appender.loadbalance.Selector = RANDOM
log4j.appender.loadbalance.layout=org.apache.log4j.PatternLayout
log4j.appender.loadbalance.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n
  
  LoadBalancingLog4jAppender是Log4jAppender的子类,会调用Log4jAppender的append方法
  该append方法中会组织Event事件,最后
  
  
  try {
        rpcClient.append(flumeEvent);
    } catch (EventDeliveryException e) {
        String msg = "Flume append() failed.";
        LogLog.error(msg);
        if (unsafeMode) {
        return;
      }

  
  该rpcClient根据log4j中配置的获得 LoadBalancingRpcClient 实例
  执行 LoadBalancingRpcClient 的append方法
  
  Iterator<HostInfo> it = selector.createHostIterator();
  while (it.hasNext()) {
        HostInfo host = it.next();
        try {
          RpcClient client = getClient(host);
          client.append(event);
          eventSent = true;
          break;
        } catch (Exception ex) {
          selector.informFailure(host);
          LOGGER.warn("Failed to send event to host " + host, ex);
        }
    }
  选择用户设置的地址,获得对应的Rpc通信,发送成功,则跳出。
  
  之后,会调用远端服务器的AvroSource的append方法
  
  @Override
public Status append(AvroFlumeEvent avroEvent) {
      logger.debug("Avro source {}: Received avro event: {}", getName(),
        avroEvent);
      sourceCounter.incrementAppendReceivedCount();
      sourceCounter.incrementEventReceivedCount();
  Event event = EventBuilder.withBody(avroEvent.getBody().array(),
        toStringMap(avroEvent.getHeaders()));
  try {
          getChannelProcessor().processEvent(event);
      } catch (ChannelException ex) {
          logger.warn("Avro source " + getName() + ": Unable to process event. " +
              "Exception follows.", ex);
          return Status.FAILED;
      }
  sourceCounter.incrementAppendAcceptedCount();
      sourceCounter.incrementEventAcceptedCount();
  return Status.OK;
}
  
  直接将Event,在事务内发送到channel中,返回rpc的结果,这样
  客户端就将一个Event成功发送到channel中,继续完成下面的业务操作。
  
页: [1]
查看完整版本: log4j到flume的过程(LoadBalancingLog4jAppender)