残缺极品 发表于 2017-5-22 06:34:47

Flume + HDFS Sink采集数据及如何添加第三方JAR

  Flume默认情况下是没有引入HDFS,Kafka,Elasticsearch,Hbase等sink的相关jar包,如果使用,需要自行添加相关jar包。

  下面我以使用HDFS Sink为例,在Flume中加入第三方JAR包。
  Flume支持一种特殊的目录结构:plugins.d,它有特殊的格式,可以很方面的管理第三方JAR。当然我们可以直接把第三方JAR丢掉$FLUME_HOME/lib目录,但是这样不利于调试和排除故障,特别是处理JAR包冲突的问题。
  plugins.d目录:

plugins.d目录位于$FLUME_HOME/plugins.d。在启动的时候,flume-ng启动脚本会检查 plugins.d 目录的插件确保符合下面的格式,并且包含了正确的路径。

  插件目录布局:

每个在 plugins.d 内的插件,最多包含三个子目录。

1,lib - 插件的JAR。

2,libext - 插件依赖JAR(S)

3,native - 任何所需的本地库,例如:.so文件

下面是两个插件在 plugins.d 目录中的位置(以下以使用HDFS Sink为例):

    plugins.d/
plugins.d/hdfs-sink/
#flume-hdfs-sink-1.5.1.jar是Flume自带的,所有lib目录为空
plugins.d/hdfs-sink/lib/   
#flume-hdfs-sink-1.5.1.jar依赖以下四个包
plugins.d/hdfs-sink/libext/commons-configuration-1.6.jar
plugins.d/hdfs-sink/libext/hadoop-annotations-2.4.1.jar
plugins.d/hdfs-sink/libext/hadoop-auth-2.4.1.jar
plugins.d/hdfs-sink/libext/hadoop-common-2.4.1.jar
plugins.d/hdfs-sink/libext/hadoop-hdfs-2.4.1.jar
#没有本地库
plugins.d/hdfs-sink/native/
#HDFS配置文件
plugins.d/hdfs-sink/conf/hdfs-site.xml
plugins.d/hdfs-sink/conf/core-site.xml









以上是HDFS所需的配置文件和依赖的JAR。  配置Flume环境变量:

JAVA_HOME=/usr/local/jdk1.8.0_45
# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
#JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote"
# Note that the Flume conf directory is always included in the classpath.
#将hdfs-site.xml和core-site.xml放入Flume的环境变量中
FLUME_CLASSPATH="/data/apache-flume-1.5.1-bin/plugins.d/hdfs-sink/conf/"
  OK,完事具备,最后采集数据到HDFS中

a1.sources = source1
a1.sinks = sink1
a1.channels = channel1
#resources
a1.sources.source1.type = spooldir
a1.sources.source1.channels = channel1
a1.sources.source1.spoolDir = /data/logs
a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 500
a1.sinks.sink1.type = hdfs
a1.sinks.sink1.channel = channel1
a1.sinks.sink1.hdfs.path = /flume/events/%Y-%M-%d
a1.sinks.sink1.hdfs.fileType = DataStream
a1.sinks.sink1.hdfs.rollCount = 0
a1.sinks.sink1.hdfs.rollSize = 0
a1.sinks.sink1.hdfs.rollInterval = 0
a1.sinks.sink1.hdfs.rollSize = 1073741824
a1.sinks.sink1.hdfs.filePrefix = nginx-%H-%M
a1.sinks.sink1.hdfs.batchSize = 200
a1.sinks.sink1.hdfs.round = true
a1.sinks.sink1.hdfs.roundValue = 10
a1.sinks.sink1.hdfs.roundUnit = minute
a1.sinks.sink1.hdfs.useLocalTimeStamp = true该示例是通过SpoolingDirectorySource获取数据放入HDFS中
页: [1]
查看完整版本: Flume + HDFS Sink采集数据及如何添加第三方JAR