设为首页 收藏本站
查看: 1436|回复: 0

[经验分享] Flume 负载平衡配置(Flume load balancing configuration)和测试

[复制链接]

尚未签到

发表于 2017-5-21 14:09:39 | 显示全部楼层 |阅读模式
2.1Flume配置
  集群DNS配置如下:
  

hadoop-maser 192.168.177.162
machine-0192.168.177.158
machine-1191.168.177.167
  配置主Flume,在hadoop-maser机上。配置文件为loadbalance.properties。

agent.sources=s1
agent.channels=c1
agent.sinks=k1 k2
agent.sinkgroups = g1
agent.sinkgroups.g1.sinks = k1 k2
agent.sinkgroups.g1.processor.type =load_balance
agent.sinkgroups.g1.processor.selector =round_robin
agent.sinkgroups.g1.processor.backoff =true
agent.sources.s1.type=avro
agent.sources.s1.channels=c1
agent.sources.s1.bind=0.0.0.0
agent.sources.s1.port=51515
agent.sources.s1.interceptors=i1
agent.sources.s1.interceptors.i1.type=timestamp
agent.channels.c1.type=memory
agent.sinks.k1.channel = c1
agent.sinks.k1.type = avro
agent.sinks.k1.hostname = machine-0
agent.sinks.k1.port = 51515
agent.sinks.k2.channel = c1
agent.sinks.k2.type = avro
agent.sinks.k2.hostname = machine-1
agent.sinks.k2.port = 51515



  配置machine-0,定义文件conf/loadbalance.properties。


#loadbalance.properties。
agent.sources=s1
agent.channels=c1
agent.sinks=k1
agent.sources.s1.type=avro
agent.sources.s1.channels=c1
agent.sources.s1.bind=0.0.0.0
agent.sources.s1.port=51515
agent.channels.c1.type=memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity =10000
agent.channels.c1.byteCapacityBufferPercentage= 20
agent.channels.c1.byteCapacity = 800000
agent.sinks.k1.type=hdfs
agent.sinks.k1.channel=c1
#这台机上没有安装hadoop节点,请配位
#hdfs://ip-address:8020/flume/%Y/%m
agent.sinks.k1.hdfs.path=/flume/%Y/%m
agent.sinks.k1.hdfs.filePrefix=flume
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k1.hdfs.rollInterval=3600
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.rollSize=0
agent.sinks.k1.hdfs.fileType=DataStream
agent.sinks.k1.hdfs.writeFormat=Text
agent.sinks.k1.hdfs.useLocalTimeStamp=false


  

  与machine-0一样,配置machine-1。
  

  
  启动flume-ng。进入flume目录下(将flume配置到linux环境变量中,不需要这一步骤,直接启动就可以了),输入下列命令。
  bin/flume-ng agent -n agent -c conf –f \
  conf/loadbalance.properties-Dflume.root.logger=DEBUG,console
  在三台机上,启动flume。

2.2 客户端应用
  这里采用flume-ng-log4appender作为客户端,演示这个数据传递的实例。准备如下jar包。

[table][/table]   DSC0000.jpg

pom.xml配置:


<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Flume-master</groupId>
<artifactId>Flume-master</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<name>Flume-master</name>
<properties>
<jackson.version>1.9.3</jackson.version>
<flume.version>1.4.0</flume.version>
<tomcat.version>7.0.42</tomcat.version>
<jetty.version>8.1.13.v20130916</jetty.version>
<!-- Hadoop properties -->
<distMgmtSnapshotsId>apache.snapshots.https</distMgmtSnapshotsId>
<distMgmtSnapshotsName>Apache Development Snapshot Repository</distMgmtSnapshotsName>
<distMgmtSnapshotsUrl>https://repository.apache.org/content/repositories/snapshots</distMgmtSnapshotsUrl>
<distMgmtStagingId>apache.staging.https</distMgmtStagingId>
<distMgmtStagingName>Apache Release Distribution Repository</distMgmtStagingName>
<distMgmtStagingUrl>https://repository.apasche.org/service/local/staging/deploy/maven2</distMgmtStagingUrl>
</properties>
<repositories>
<repository>
<id>${distMgmtSnapshotsId}</id>
<name>${distMgmtSnapshotsName}</name>
<url>${distMgmtSnapshotsUrl}</url>
</repository>
<repository>
<id>repository.jboss.org</id>
<url>http://repository.jboss.org/nexus/content/groups/public/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>oracleReleases</id>
<name>Oracle Released Java Packages</name>
<url>http://download.oracle.com/maven</url>
<layout>default</layout>
</repository>
<repository>
<id>java.net2</id>
<name>Repository hosting the jee6 artifacts</name>
<url>http://download.java.net/maven/2</url>
</repository>
</repositories>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>${jackson.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-web-api</artifactId>
<version>6.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>${flume.version}</version>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>FlumeWebApp</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
<includes>
<include>**/*</include>
</includes>
</resource>
</resources>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<version>2.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.4</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<configuration>
<rules>
<requireMavenVersion>
<version>[3.0.2,)</version>
</requireMavenVersion>
<requireJavaVersion>
<version>1.6</version>
</requireJavaVersion>
</rules>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.5</version>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<version>0.7</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.6</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<version>3.0</version>
<dependencies>
<dependency><!-- add support for ssh/scp -->
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-ssh</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>com.atlassian.maven.plugins</groupId>
<artifactId>maven-clover2-plugin</artifactId>
<version>3.0.5</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<inherited>true</inherited>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<version>2.1.1</version>
<configuration>
<failOnMissingWebXml>false</failOnMissingWebXml>
</configuration>
</plugin>
<!--
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
</plugin>
-->
<plugin>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
<version>${jetty.version}</version>
<configuration>
<stopPort>9999</stopPort>
<stopKey>stop</stopKey>
<webApp>
<contextPath>/${project.build.finalName}</contextPath>
</webApp>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<inherited>false</inherited>
<executions>
<execution>
<id>clean</id>
<goals>
<goal>enforce</goal>
</goals>
<phase>pre-clean</phase>
</execution>
<execution>
<id>default</id>
<goals>
<goal>enforce</goal>
</goals>
<phase>validate</phase>
</execution>
<execution>
<id>site</id>
<goals>
<goal>enforce</goal>
</goals>
<phase>pre-site</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>.gitattributes</exclude>
<exclude>.gitignore</exclude>
<exclude>.git/**</exclude>
<exclude>.idea/**</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.0</version>
<executions>
<execution>
<id>attach-descriptor</id>
<goals>
<goal>attach-descriptor</goal>
</goals>
<configuration>
<generateReports>true</generateReports>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>hdfs</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<flume.Port>51515</flume.Port>
</properties>
</profile>
<profile>
<id>hbase</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<flume.Port>61616</flume.Port>
</properties>
</profile>
</profiles>
</project>



  Java代码:
  

import java.util.Date;
import java.util.concurrent.Executors;
importjava.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

public class Worker implements Runnable{
private staticfinalLogger LOG= Logger.getLogger(Worker.class);
private String command;
/**
* @param args
*/
public staticvoidmain(String[] args) {
new Worker("0").init();
}
public voidinit(){
int numWorkers = 1;
int threadPoolSize = 3 ;
ScheduledExecutorServicescheduledThreadPool = Executors.newScheduledThreadPool(threadPoolSize);
//schedule to run after sometime
System.out.println("Current Time = "+new Date());
Worker worker = null;
for(int i=0; i< numWorkers; i++){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
worker = new Worker("do heavy processing");
//             scheduledThreadPool.schedule(worker, 10, TimeUnit.SECONDS);
//scheduleAtFixedRate
//             scheduledThreadPool.scheduleAtFixedRate(worker, 0, 1, TimeUnit.SECONDS);
scheduledThreadPool.scheduleWithFixedDelay(worker, 5, 10,
TimeUnit.SECONDS);
}
//add some delay to let some threads spawn by scheduler
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduledThreadPool.shutdown();
while(!scheduledThreadPool.isTerminated()){
//wait for all tasks to finish
}
LOG.info("Finished all threads");
}
public Worker(String command){
this.command = command;
}
@Override
public void run() {
LOG.info(Thread.currentThread().getName()+" Start. Command = "+command);
processCommand();
LOG.info(Thread.currentThread().getName()+" End.");
}
private void processCommand() {
try {
for(int i = 1000; i < 2000; i++){
LOG.info("sequence:"+ i);
}
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString(){
return this.command;
}
}


  
  Log4j配置文件log4j.properties

# File Appender rootLog
log4j.rootLogger=DEBUG,stdout,rootLog
#console configure for DEV environment
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-ddHH:mm:ss,SSS}%-5p(%c:%L)- %m%n
log4j.appender.rootLog=org.apache.log4j.RollingFileAppender
log4j.appender.rootLog.File=rootLog.log
log4j.appender.rootLog.MaxFileSize=5000KB
log4j.appender.rootLog.MaxBackupIndex=20
log4j.appender.rootLog.layout=org.apache.log4j.PatternLayout
log4j.appender.rootLog.layout.ConversionPattern=%d{yyyy-MM-ddHH:mm:ss,SSS}%-5p(%c:%L)- %m%n
# File Appender boentel
#log4j.logger.com.boentel=DEBUG,boentel
#log4j.additivity.com.boentel=true
#log4j.appender.boentel=org.apache.log4j.RollingFileAppender
#log4j.appender.boentel.File= boentel.log
#log4j.appender.boentel.MaxFileSize=2000KB
#log4j.appender.boentel.MaxBackupIndex=20
#log4j.appender.boentel.layout=org.apache.log4j.PatternLayout
#log4j.appender.boentel.layout.ConversionPattern=%d{yyyy-MM-ddHH:mm:ss,SSS} %-5p (%c:%L) - %m%n
#log4j.logger.com.loadbalance= DEBUG,loadbalance
#log4j.additivity.com.loadbalance= true
#log4j.appender.loadbalance =org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
#log4j.appender.loadbalance.Hosts = machine-0:51515 machine-1:51515 hadoop-master:51515
#log4j.appender.loadbalance.UnsafeMode = true
#log4j.appender.out2.MaxBackoff = 60000
#FQDN RANDOM ,defaultis ROUND_ROBIN
#log4j.appender.loadbalance.Selector = ROUND_ROBIN
#log4j.appender.loadbalance.layout=org.apache.log4j.PatternLayout
#log4j.appender.loadbalance.layout.ConversionPattern=%d{yyyy-MM-ddHH:mm:ss,SSS} %-5p (%c:%L) - %m%n
log4j.logger.com.loadbalance=DEBUG,flume
log4j.additivity.com.loadbalance=true
#single node machine
log4j.appender.flume= org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname= hadoop-master
log4j.appender.flume.Port= 51515
log4j.appender.flume.UnsafeMode= true
log4j.appender.flume.layout= org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern= %d{yyyy-MM-dd HH:mm:ss,SSS}%-5p(%c:%L)- %m%n



  
  启动app,一段时间后,观察数据是否有丢失。我使用几个case测试不过,没有发现任务问题。
  待续:


  • 这个的Channel使用的是memorychannel,应该改成File Channel,或者JDBC channel。
  • 这里实现了flume端的load balance特性,也可以实现failover特性。
  • flume-ng-log4jappender 加载平衡。细心的朋友一定会发现,我测试过客户端的flume-ng-log4jappender loadbalance特性,可惜,没能够正确测试成功。有待进一步研究,这个有利于实现客户端接口的loadbalance特性。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379641-1-1.html 上篇帖子: flume-ng配置参数详解 下篇帖子: flume 自定义source
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表