liuming794 发表于 2015-9-17 08:52:49

flume实例一、监听目录日志上传到其他服务器

  一、flume-ng简介
  请参考官方文档:http://flume.apache.org/FlumeUserGuide.html
  二、实例
  需求说明:需要监控一个目录,并自动上传到服务器,且需要在传输过程中进行加密。
  整体方案:n个client-agent -->server-agent
  client-agent:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.basenameHeader = true
#a1.sources.r1.ignorePattern = .+\.log$
a1.sources.r1.bufferMaxLineLength = 1048576
a1.sources.r1.batchSize = 5000
#拦截器
a1.sources.r1.interceptors = i1 i2 i3
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = id
a1.sources.r1.interceptors.i2.type = static
a1.sources.r1.interceptors.i2.key = key
a1.sources.r1.interceptors.i3.type = com.landray.behavior.interceptor.BehaviorClientSerurityHDFSInterceptor$Builder
#file channel
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = ./checkpoint
a1.channels.c1.dataDirs = ./data
#sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.compression-type = deflate
a1.sinks.k1.compression-leve = 9
a1.sinks.k1.batch-size = 5000
a1.sinks.k1.hostname = http://test.com.cn
a1.sinks.k1.port = 5281
a1.sinks.k1.request-timeout = 20000
#user define
#需要上传的日志目录
a1.sources.r1.spoolDir =D:/flume_tes/source
#客户唯一ID
a1.sources.r1.interceptors.i1.value = 123456
#秘钥
a1.sources.r1.interceptors.i2.value = 密钥   server-agent:


a2.sources = r2
a2.channels = c2
a2.sinks = k2
#source
a2.sources.r2.type = avro
a2.sources.r2.channels = c2
a2.sources.r2.compression-type = deflate
a2.sources.r2.bind = localhost
a2.sources.r2.port = 5281
a2.sources.r2.interceptors = i1
a2.sources.r2.interceptors.i1.type = com.landray.behavior.interceptor.BehaviorServerSerurityInterceptor$Builder
a2.channels = c2
a2.channels.c2.type = file
a2.channels.c2.checkpointDir = ./checkpoint
a2.channels.c2.dataDirs = ./data
a2.channels.c2.transactionCapacity = 20000
#a2.channels.c2.type = memory
#a2.channels.c2.capacity = 1000000
#a2.channels.c2.transactionCapacity = 20000
#a2.channels.c2.byteCapacityBufferPercentage = 20
#default 80%
#a2.channels.c2.byteCapacity = 800000
a2.sinks.k2.type = com.landray.behavior.sink.BehaviorRollingFileSink
a2.sinks.k2.channel = c2
#no check
a2.sinks.k2.sink.rollInterval = 0
a2.sinks.k2.sink.batchSize = 20000
#user define
#windows
a2.sinks.k2.sink.directory = D:/behavior/logs
#linux
#a2.sinks.k2.sink.directory = /home/nemo/behavior
  三、启动脚本  目录截图:


  目录说明:
  checkpoint:设置的是flume中的检查点
  conf配置的是:log4j.properties
  data:是flume的memory
  Libs:flume启动的jar包
  source.conf:配置文件
  
  官网给出的是命令启动模式,比较不灵活,这里给出linux和window版本两种启动方式
  linux:


#!/bin/sh
JAVA_OPTS=-Xmx1024m
$JAVA_HOME/bin/java $JAVA_OPTS -Dlog4j.configuration=file:./conf/log4j.properties -cp "./lib/*" org.apache.flume.node.Application --conf-file ./target.conf --name a2
  windows:


@echo on
set FLUME_HOME=%~dp0
echo %~dp0
rem 配置JAVA环境变量
set JAVA_HOME=%FLUME_HOME%..\jdk1.6
set PATH=%JAVA_HOME%\bin
echo %JAVA_HOME%
java -version
java %JAVA_OPTS% -Dlog4j.configuration=file:./conf/log4j.properties -cp .;./lib/* org.apache.flume.node.Application --conf-file ./target_hdfs.conf --name a2
PAUSE
页: [1]
查看完整版本: flume实例一、监听目录日志上传到其他服务器