flume+kafka+hdfs详解
flume架构图单节点flume配置
flume-1.4.0启动flume
1
bin/flume-ng agent --conf ./conf-f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent
-n表示配置文件中agent的名字
1
2
3
4
5
6
7
8
9
10
11
12
13
14
agent.sources = r1
agent.sinks = s1
agent.channels = c1
agent.sources.r1.channels = c1
agent.sinks.s1.channel = c1
#Describe/configure the sources
agent.sources.r1.type = exec
agent.sources.r1.command = tail -F /home/flume/loginfo
#Use a channel which buffers events in memory
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000#Event
agent.channels.c1.transactionCapacity = 100
agent.sinks.s1.type = logger
flume-1.4.0 + kafka-0.7.2+hdfs flume配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
agent.sources = r1
agent.sinks = s_kafka s_hdfs
agent.channels = c_kafka c_hdfs
agent.sources.r1.channels = c_kafka c_hdfs
agent.sources.r1.type = exec
#下面这个脚本tail某个日志
agent.sources.r1.command = tail -F /home/flume/loginfo
agent.channels.c_kafka.type = memory
agent.channels.c_hdfs.type = memory
agent.sinks.s_kafka.type = com.sink.FirstkafkaSink
agent.sinks.s_kafka.channel = c_kafka
#kafka需要连接zk,写入broker数据
agent.sinks.s_kafka.zkconnect = localhost:2181
agent.sinks.s_kafka.topic = test
agent.sinks.s_kafka.serializer.class = kafka.serializer.StringEncoder
agent.sinks.s_kafka.metadata.broker.list = localhost:9092 #配置文件server.properties
agent.sinks.s_kafka.custom.encoding = UTF-8
agent.sinks.s_hdfs.type = hdfs
agent.sinks.s_hdfs.channel = c_hdfs
#默认端口8020
agent.sinks.s_hdfs.hdfs.path = hdfs://localhost:9000/root/source
agent.sinks.s_hdfs.hdfs.filePrefix = events-
agent.sinks.s_hdfs.hdfs.fileType = DataStream
agent.sinks.s_hdfs.hdfs.writeFormat = Text
agent.sinks.s_hdfs.hdfs.rollCount = 30 #达到某一数值记录生成文件
agent.sinks.s_hdfs.hdfs.rollSize = 0
agent.sinks.s_hdfs.hdfs.rollInterval = 0
agent.sinks.s_hdfs.hdfs.useLocalTimeStamp = true
agent.sinks.s_hdfs.hdfs.idleTimeout = 51
agent.sinks.s_hdfs.hdfs.threadsPoolSize = 2
Flume内置channel,source,sink汇总
页:
[1]