这种模式,有两种方式,一种是用来复制(Replication),另一种是用来分流(Multiplexing)。Replication方式,可以将最前端的数据源复制多份,分别传递到多个channel中,每个channel接收到的数据都是相同的,配置格式,如下所示:
01#List the sources, sinks and channels for the agent02<Agent>.sources= <Source1>03<Agent>.sinks= <Sink1> <Sink2>04<Agent>.channels= <Channel1> <Channel2>05 06#set list of channels for source (separated by space)07<Agent>.sources.<Source1>.channels= <Channel1> <Channel2>08 09#set channel for sinks10<Agent>.sinks.<Sink1>.channel= <Channel1>11<Agent>.sinks.<Sink2>.channel= <Channel2>12 13<Agent>.sources.<Source1>.selector.type= replicating上面指定了selector的type的值为replication,其他的配置没有指定,使用的Replication方式,Source1会将数据分别存储到Channel1和Channel2,这两个channel里面存储的数据是相同的,然后数据被传递到Sink1和Sink2。
Multiplexing方式,selector可以根据header的值来确定数据传递到哪一个channel,配置格式,如下所示:
1#Mapping for multiplexing selector2<Agent>.sources.<Source1>.selector.type= multiplexing3<Agent>.sources.<Source1>.selector.header= <someHeader>4<Agent>.sources.<Source1>.selector.mapping.<Value1>= <Channel1>5<Agent>.sources.<Source1>.selector.mapping.<Value2>= <Channel1> <Channel2>6<Agent>.sources.<Source1>.selector.mapping.<Value3>= <Channel2>7#...8 9<Agent>.sources.<Source1>.selector.default= <Channel2>上面selector的type的值为multiplexing,同时配置selector的header信息,还配置了多个selector的mapping的值,即header的值:如果header的值为Value1、Value2,数据从Source1路由到Channel1;如果header的值为Value2、Value3,数据从Source1路由到Channel2。
使用apache-flume-1.5.0.1自带的例子,使用Avro Source接收外部数据源,Logger作为sink,即通过Avro RPC调用,将数据缓存在channel中,然后通过Logger打印出调用发送的数据。
配置Agent,修改配置文件conf/flume-conf.properties,内容如下:
01#Define a memory channel called ch1 on agent102agent1.channels.ch1.type= memory03 04#Define an Avro source called avro-source1 on agent1 and tell it05#to bind to 0.0.0.0:41414. Connect it to channel ch1.06agent1.sources.avro-source1.channels= ch107agent1.sources.avro-source1.type= avro08agent1.sources.avro-source1.bind= 0.0.0.009agent1.sources.avro-source1.port= 4141410 11#Define a logger sink that simply logs all events it receives12#and connect it to the other end of the same channel.13agent1.sinks.log-sink1.channel= ch114agent1.sinks.log-sink1.type= logger15 16#Finally, now that we've defined all of our components, tell17#agent1 which ones we want to activate.18agent1.channels= ch119agent1.channels.ch1.capacity= 100020agent1.sources= avro-source121agent1.sinks= log-sink1首先,启动Agent进程:
1bin/flume-ngagent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1然后,启动Avro Client,发送数据:
1bin/flume-ngavro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log-Dflume.root.logger=DEBUG,console
Avro Source+Memory Channel+HDFS Sink
配置Agent,修改配置文件conf/flume-conf-hdfs.properties,内容如下:
01#Define a source, channel, sink02agent1.sources= avro-source103agent1.channels= ch104agent1.sinks= hdfs-sink05 06#Configure channel07agent1.channels.ch1.type= memory08agent1.channels.ch1.capacity= 100000009agent1.channels.ch1.transactionCapacity= 50000010 11#Define an Avro source called avro-source1 on agent1 and tell it12#to bind to 0.0.0.0:41414. Connect it to channel ch1.13agent1.sources.avro-source1.channels= ch114agent1.sources.avro-source1.type= avro15agent1.sources.avro-source1.bind= 0.0.0.016agent1.sources.avro-source1.port= 4141417 18#Define a logger sink that simply logs all events it receives19#and connect it to the other end of the same channel.20agent1.sinks.hdfs-sink1.channel= ch121agent1.sinks.hdfs-sink1.type= hdfs22agent1.sinks.hdfs-sink1.hdfs.path= hdfs://h1:8020/data/flume/23agent1.sinks.hdfs-sink1.hdfs.filePrefix= sync_file24agent1.sinks.hdfs-sink1.hdfs.fileSuffix= .log25agent1.sinks.hdfs-sink1.hdfs.rollSize= 104857626agent1.sinks.hdfs-sink1.rollInterval= 027agent1.sinks.hdfs-sink1.hdfs.rollCount= 028agent1.sinks.hdfs-sink1.hdfs.batchSize= 150029agent1.sinks.hdfs-sink1.hdfs.round= true30agent1.sinks.hdfs-sink1.hdfs.roundUnit= minute31agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize= 2532agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp= true33agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas= 134agent1.sinks.hdfs-sink1.fileType= SequenceFile35agent1.sinks.hdfs-sink1.writeFormat= TEXT首先,启动Agent:
1bin/flume-ngagent -c ./conf/ -f conf/flume-conf-hdfs.properties -Dflume.root.logger=INFO,console -n agent1然后,启动Avro Client,发送数据:
1bin/flume-ngavro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log-Dflume.root.logger=DEBUG,console可以查看同步到HDFS上的数据:
1hdfsdfs -ls /data/flume结果示例,如下所示:
1-rw-r--r-- 3 shirdrn supergroup 1377617 2014-09-16 14:35 /data/flume/sync_file.1410849320761.log2-rw-r--r-- 3 shirdrn supergroup 1378137 2014-09-16 14:35 /data/flume/sync_file.1410849320762.log3-rw-r--r-- 3 shirdrn supergroup 259148 2014-09-16 14:35 /data/flume/sync_file.1410849320763.log