设为首页 收藏本站
查看: 1391|回复: 0

[经验分享] Flume客户端flume-ng-log4jappender 负载平衡(LoadBalancingLog4jAppender)

[复制链接]

尚未签到

发表于 2017-5-21 14:17:46 | 显示全部楼层 |阅读模式
    主机DNS配置:
  

192.168.177.167 machine-1
192.168.177.168 machine-2
192.168.177.158 machine-0
192.168.177.174 hadoop-master hbase-master


    hadoop-maser 和machine-2当主机,其它机器当做collector机,存储在HDFS中。
    hadoop-master和machine-2机上的flume配置:

agent.sources=s1
agent.channels=c1
agent.sinks=k1 k2
agent.sinkgroups = g1
agent.sinkgroups.g1.sinks = k1 k2  
agent.sinkgroups.g1.processor.type = load_balance  
agent.sinkgroups.g1.processor.selector = round_robin  
agent.sinkgroups.g1.processor.backoff = true  

agent.sources.s1.type=avro
agent.sources.s1.channels=c1
agent.sources.s1.bind=0.0.0.0
agent.sources.s1.port=51515
agent.sources.s1.interceptors=i1
agent.sources.s1.interceptors.i1.type=timestamp

agent.channels.c1.type=jdbc

agent.sinks.k1.channel = c1  
agent.sinks.k1.type = avro  
agent.sinks.k1.hostname = machine-0
agent.sinks.k1.port = 51515
agent.sinks.k2.channel = c1  
agent.sinks.k2.type = avro
agent.sinks.k2.hostname = machine-1
agent.sinks.k2.port = 51515



     machine-1 和machine-0的flume配置:
  

agent.sources=s1
agent.channels=c1
agent.sinks=k1

agent.sources.s1.type=avro
agent.sources.s1.channels=c1
agent.sources.s1.bind=0.0.0.0
agent.sources.s1.port=51515

agent.channels.c1.type=jdbc

agent.sinks.k1.type=hdfs
agent.sinks.k1.channel=c1
agent.sinks.k1.hdfs.path=/flume/%Y/%m
agent.sinks.k1.hdfs.filePrefix=flume
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k1.hdfs.rollInterval=3600
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.rollSize=0
agent.sinks.k1.hdfs.fileType=DataStream
agent.sinks.k1.hdfs.writeFormat=Text
agent.sinks.k1.hdfs.useLocalTimeStamp=false


  

     log4j的配置:

# File Appender rootLog
log4j.rootLogger=DEBUG,stdout,rootLog

#console configure for DEV environment
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n

log4j.appender.rootLog=org.apache.log4j.RollingFileAppender
log4j.appender.rootLog.File= rootLog.log
log4j.appender.rootLog.MaxFileSize=5000KB
log4j.appender.rootLog.MaxBackupIndex=20
log4j.appender.rootLog.layout=org.apache.log4j.PatternLayout
log4j.appender.rootLog.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n

# File Appender boentel
#log4j.logger.com.boentel=DEBUG,boentel
#log4j.additivity.com.boentel=true
#log4j.appender.boentel=org.apache.log4j.RollingFileAppender
#log4j.appender.boentel.File= boentel.log
#log4j.appender.boentel.MaxFileSize=2000KB
#log4j.appender.boentel.MaxBackupIndex=20
#log4j.appender.boentel.layout=org.apache.log4j.PatternLayout
#log4j.appender.boentel.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n

log4j.logger.com.loadbalance= DEBUG,loadbalance
log4j.additivity.com.loadbalance= true

log4j.appender.loadbalance = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.loadbalance.Hosts =machine-2:51515 hadoop-master:51515
#log4j.appender.loadbalance.UnsafeMode = true
log4j.appender.out2.MaxBackoff = 30000
#FQDN RANDOM ,default is ROUND_ROBIN
log4j.appender.loadbalance.Selector = RANDOM
log4j.appender.loadbalance.layout=org.apache.log4j.PatternLayout
log4j.appender.loadbalance.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n



     测试代码:
  

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;



public class Worker implements Runnable{

private static final Logger LOG = Logger.getLogger(Worker.class);  
private String command;
/**
* @param args
*/
public static void main(String[] args) {
new Worker("0").init();
}

public void init(){
int numWorkers = 1;
int threadPoolSize = 3 ;

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(threadPoolSize);
//schedule to run after sometime
System.out.println("Current Time = "+new Date());
Worker worker = null;
for(int i=0; i< numWorkers; i++){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
worker = new Worker("do heavy processing");
//            scheduledThreadPool.schedule(worker, 10, TimeUnit.SECONDS);
//scheduleAtFixedRate
//            scheduledThreadPool.scheduleAtFixedRate(worker, 0, 1, TimeUnit.SECONDS);
scheduledThreadPool.scheduleWithFixedDelay(worker, 5, 10,
TimeUnit.SECONDS);
}
//add some delay to let some threads spawn by scheduler
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduledThreadPool.shutdown();
while(!scheduledThreadPool.isTerminated()){
//wait for all tasks to finish
}
LOG.info("Finished all threads");
}
public Worker(String command){
this.command = command;
}
@Override
public void run() {
LOG.info(Thread.currentThread().getName()+" Start. Command = "+command);
processCommand();
LOG.info(Thread.currentThread().getName()+" End.");
}
private void processCommand() {
try {
for(int i = 1000; i < 1200; i++){
LOG.info("sequence:" + i);
}
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString(){
return this.command;
}
}

     小结:
     最终能实现负载均衡的作用,但是,性能上还有些欠缺。
     当一台机死掉时,客户端将尝试不断链接,影响到数据传送到其它机子上。当死掉的机器恢复后,客户端备份的数据会重新发送到flume agent。数据正确性是达到了,但是,万一这个app当掉了,对应的日志信息不就丢了吗?这是一个问题,有待进一步的改进。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379647-1-1.html 上篇帖子: flume提交配置分析 下篇帖子: Flume源代码解读四
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表