|
科普:flume是apache下的一个日志收集系统,主要由source+channel+ sink组成:source可以看做是源,也就是日志的来源,本例子是用exec source;channel可以看做是中转的路,可以是文件也可以是内存;sink是输出,一般有hive sink,hbase sink,hdfs sink,avro sink。当然一个机器可以有多个source+channel+ sink。
资源: 172.16.6.152 node1 安装有flume+datanode
172.16.6.151 master 安装有flume +hive+ namenode
思路:1.node1机器中使用exec source 执行tail -F /*/*.log获取日志的source,然后使用memory channel,然后使用avro传输至master
2.master中的flume 的souce是avro,接收node1机器所发送过来的数据,经过memory channel,最后使用hdfs sink写入hdfs中
3.由于本人水平有限,本来想直接在master中使用hive sink,无奈一直报hive class找不着的错误。所以就使用了另外一个损招,hive导入数据,是可以直接复制文件进入到hive表所对应的location中,所以就有了解决办法。
附上详细的设置:
1.flume安装,解压,然后修改配置文件conf/flume-env.sh 设置环境变量
2.node1节点,在conf目录下生成example4.conf文件,example4.conf文件内容如下
# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 100000
agent1.channels.ch1.transactionCapacity = 100000
agent1.channels.ch1.keep-alive = 30
# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
#agent1.sources.avro-source1.channels = ch1
#agent1.sources.avro-source1.type = avro
#agent1.sources.avro-source1.bind = 0.0.0.0
#agent1.sources.avro-source1.port = 41414
#agent1.sources.avro-source1.threads = 5
#define source monitor a file
agent1.sources.avro-source1.type = exec
agent1.sources.avro-source1.shell = /bin/bash -c
agent1.sources.avro-source1.command = tail -F /opt/cdh5.3.0/hadoop/logs/hadoop-hadoop-namenode-node1.log
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.threads = 5
# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = avro
agent1.sinks.log-sink1.hostname=master
agent1.sinks.log-sink1.port=41415
# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1
3.master节点flume设置,在conf中生成example6.conf,内容如下
# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 100000
agent1.channels.ch1.transactionCapacity = 100000
agent1.channels.ch1.keep-alive = 30
# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
#agent1.sources.avro-source1.channels = ch1
#agent1.sources.avro-source1.type = avro
#agent1.sources.avro-source1.bind = 0.0.0.0
#agent1.sources.avro-source1.port = 41414
#agent1.sources.avro-source1.threads = 5
#define source monitor a file
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = master
agent1.sources.avro-source1.port = 41415
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.threads = 5
# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = hdfs
agent1.sinks.log-sink1.hdfs.path = hdfs://master:8020/zhonghui/flume/logs/type=hadoop/host=172.16.6.152/
agent1.sinks.log-sink1.hdfs.useLocalTimeStamp=true
agent1.sinks.log-sink1.hdfs.writeFormat = Text
agent1.sinks.log-sink1.hdfs.fileType = DataStream
agent1.sinks.log-sink1.hdfs.rollInterval = 0
agent1.sinks.log-sink1.hdfs.rollSize = 1000000
agent1.sinks.log-sink1.hdfs.rollCount = 0
agent1.sinks.log-sink1.hdfs.batchSize = 1000
agent1.sinks.log-sink1.hdfs.txnEventMax = 1000
agent1.sinks.log-sink1.hdfs.callTimeout = 60000
agent1.sinks.log-sink1.hdfs.appendTimeout = 60000
agent1.sinks.log-sink1.hdfs.filePrefix = log
agent1.sinks.log-sink1.hdfs.batchSize=2
# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1
4.启动master的flume,和node1的flum
master:bin/flume-ng agent --conf conf --conf-file conf/example6.conf --name agent1 -Dflume.root.logger=INFO,consolee
node1:bin/flume-ng agent --conf conf --conf-file conf/example4.conf --name agent1 -Dflume.root.logger=INFO,consolee
6.大概过个几分钟(最多10分钟,当然你可以改配置文件,我暂时没有找到配置项),就会产生文件,如果会产生文件,那就成功了一半,反之,自己小心核对配置文件。然后停止node1和master上的服务,删除刚生成的文件夹(包括文件)
7.hive环境搭建我就不复述了,自己百度吧。
create table logs(
create string,
content string)
PARTITIONED BY (type string,host string)
row format delimited fields terminated by ',' location '/zhonghui/flume/logs/';
alter table logs add partition (type='hadoop',host='172.16.6.152');
这样就创建了一个hive表,以及一个分区
8.启动node1和master上的flume。然后坐等数据吧
hive> select * from logs;
OK
2015-09-15 17:49:15 800 INFO org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor: Rescanning after 30000 millisecondshadoop 172.16.6.152
2015-09-15 17:49:15 801 INFO org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor: Scanned 0 directive(s) and 0 block(s) in 1 millisecond(s).hadoop 172.16.6.152
2015-09-15 17:49:37 825 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: Number of transactions: 2 Total time for transactions(ms): 1 Number of transactions batched in Syncs: 0 Number of syncs: 1 SyncTimes(ms): 1050 hadoop 172.16.6.152
2015-09-15 17:49:37 853 INFO BlockStateChange: BLOCK* addToInvalidates: blk_1073860942_120256 172.16.6.153:50010 172.16.6.152:50010hadoop 172.16.6.152
2015-09-15 17:49:37 870 INFO BlockStateChange: BLOCK* addToInvalidates: blk_1073860943_120257 172.16.6.152:50010 172.16.6.151:50010hadoop 172.16.6.152
2015-09-15 17:49:38 801 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* BlockManager: ask 172.16.6.151:50010 to delete [blk_1073860943_120257]hadoop 172.16.6.152
2015-09-15 17:49:41 801 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* BlockManager: ask 172.16.6.153:50010 to delete [blk_1073860942_120256]hadoop 172.16.6.152
2015-09-15 17:49:44 802 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* BlockManager: ask 172.16.6.152:50010 to delete [blk_1073860942_120256hadoop 172.16.6.152
2015-09-15 17:49:45 800 INFO org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor: Rescanning after 30000 millisecondshadoop 172.16.6.152
2015-09-15 17:49:45 801 INFO org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor: Scanned 0 directive(s) and 0 block(s) in 1 millisecond(s).hadoop 172.16.6.152
2015-09-15 17:50:15 800 INFO org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor: Rescanning after 30000 millisecondshadoop 172.16.6.152
2015-09-15 17:50:15 800 INFO org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor: Scanned 0 directive(s) and 0 block(s) in 0 millisecond(s).hadoop 172.16.6.152
2015-09-15 17:50:22 104 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Roll Edit Log from 172.16.6.150hadoop 172.16.6.152
2015-09-15 17:50:22 104 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: Rolling edit logshadoop 172.16.6.152
附:https://flume.apache.org/FlumeUserGuide.html#hdfs-sink 官网上比较详细的资料
|
|