ddsdjn 发表于 2015-9-17 07:43:47

flume-ng 自定义sink消费flume source

如何从一个已经存在的Flume source消费数据
  

1.下载flume
  wget http://www.apache.org/dist/flume/stable/apache-flume-1.5.2-bin.tar.gz
  

2.创建一个自己的ConsoleSink.java



import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
public class ConsoleSink extends AbstractSink implements Configurable {
@Override
public void configure(Context context) {
}
@Override
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Transaction tx = null;
try {
Channel channel = getChannel();
tx = channel.getTransaction();
tx.begin();
for (int i = 0; i < 100; i++) {
Event event = channel.take();
if (event == null) {
status = Status.BACKOFF;
break;
} else {
String body = new String(event.getBody());
System.out.println(body);
}
}
tx.commit();
} catch (Exception e) {
if (tx != null) {
tx.commit();
}
e.printStackTrace();
} finally {
if (tx != null) {
tx.close();
}
}
return status;
}
}
  

3.编译



javac -classpath lib/flume-ng-core-1.5.2.jar:lib/flume-ng-sdk-1.5.2.jar:lib/flume-ng-configuration-1.5.2.jar ConsoleSink.java
jar -cvf console-sink.jar ConsoleSink.class
rm -rf ConsoleSink.class
mv console-sink.jar lib/ //这里编译完要放到flume的lib目录里

  

4.配置文件
  conf/example.conf



a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = ConsoleSink //这里是你自己Sink的包名和类名
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


5.启动



bin/flume-ngagent -c conf -f conf/example.conf -n a1
  

6.在需要被消费的Flume Source上配置



a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10 //这里是刚刚启动agent的机器地址
a1.sinks.k1.port = 44444
  
页: [1]
查看完整版本: flume-ng 自定义sink消费flume source