flume运行问题及测试
启动flumeflume-ng agent -n agent -f ../conf/flume-conf.properties
报错一:ava.lang.IllegalStateException: Transaction Capacity of Memory Channel cannot be higher than the capacity.
1 14/07/05 14:12:21 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
2 14/07/05 14:12:21 ERROR node.AbstractConfigurationProvider: Channel c1 has been removed due to an error during configuration
3 java.lang.IllegalStateException: Transaction Capacity of Memory Channel cannot be higher than the capacity.
4 at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
5 at org.apache.flume.channel.MemoryChannel.configure(MemoryChannel.java:251)
6 at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
7 at org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:203)
8 at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
9 at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
10 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
11 at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
12 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
13 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
14 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
15 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
16 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
17 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
18 at java.lang.Thread.run(Thread.java:662)
View Code 修改配置文件:flume-conf.properties. 原来是10改为60直到100
# In this case, it specifies the capacity of the memory channel
agent.channels.c1.capacity =100
报错二: Cannot assign requested address
14/07/05 14:26:12 ERROR lifecycle.LifecycleSupervisor: Unable to start EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} } - Exception follows.
org.apache.flume.FlumeException: java.net.BindException: Cannot assign requested address
va.net.BindException: Cannot assign requested address
at sun.nio.ch.Net.bind(Native Method)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:126)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at org.apache.flume.source.NetcatSource.start(NetcatSource.java:162)
at org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
14/07/05 14:26:12 ERROR lifecycle.LifecycleSupervisor: Unable to start EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} } - Exception follows.
org.apache.flume.FlumeException: java.net.BindException: Cannot assign requested address
at org.apache.flume.source.NetcatSource.start(NetcatSource.java:168)
at org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
View Code
agent的source指定的端口:是安装flume的主机要监听的端口。IP是安装flume的主机的IP。
启动成功;
程序模拟:发送日志端
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Scanner;
public class FlumeWebLog {
public static void main(String[] args){
Socket client=null;
try {
client=new Socket("your flume address",6789);
OutputStream outputStream = client.getOutputStream();
Scanner scanner = new Scanner(System.in);
PrintWriter pWriter = new PrintWriter(outputStream, true);
pWriter.println("hello,enter bye to exit");
String line = null;
while (scanner.hasNextLine()) {
if (!(line = scanner.nextLine()).equals("bye")) {
pWriter.println(line);
System.out.println(line);
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
把从控制台输入的内容发给flume。
运行,看到hdfs上面出现数据
页:
[1]