tset123 发表于 2015-11-27 16:43:43

Flume-1.6.0修改



应项目需求,需要对多种日志收集
日志说明:

1、追加试,每天生成日志,定期删除。一条日志内容跨多行,日志间有无用信息

2、mysql主键自增追加,多个表,定期删除。

需求说明:

1、存入elasticsearch,按天建立index(由event的head生成)

2、存入hdfs,按天建文件(由event的body生成)





源代码
https://github.com/zjf92/modified-flume-1.6.0


以下为源码说明:
*对大文件处理存疑,想充分利用内存,但它不可控
DirRegexSource
功能设计
递归监控目录(筛选文件)
按字节数核对,重启续读
正则提取数据
内存使用上限totalMemory大于maxMemory的0.4
参数
monitorDir必填监控目录
monitorFileRegex[\\W\\w]+选择监控文件名的正则
checkFile必填读取的历史记录文件
contentRegex必填从change file中循环取content的正则
delayTime30s扫描目录间隔
charsetNameUTF-8文件编码
batchSize1024单个事务提交event最大数量

//------------------------------------------------------------------
SqlSource
功能设计
多表监控
index字段MAX记录,进行续读
内存使用上限totalMemory大于maxMemory的0.4
参数
checkFile必填index的MAX值
url必填mysql连接参数
usernamerootmysql连接参数
passwordrootmysql连接参数
tables必填表(以{split}分隔)
columns必填字段(一层以{split}分隔、二层以,分隔)
indexColumns必填顺序自增ID字段(以{split}分隔)
delayTime30轮询间隔
batchSize1024单个事务提交event最大数量

//------------------------------------------------------------------
ElasticSearchSink
新功能设计
提供索引时刻滚动(分、时、天)
value转义
旧参数
indexNameBuilder=org.apache.flume.sink.elasticsearch.HeadFieldIndexNameBuilder //按时刻滚动
新参数
indexNameBuilder.timeRollerFlag时刻滚动必填DAY/HOUR/MINUTE
indexNameBuilder.formerField时刻滚动必填event的head的时间字段
indexNameBuilder.sdfParsePattern时刻滚动必填解析时间值的Pattern
serializer.escapekeys要转义key,","分割

//------------------------------------------------------------------
HDFSEventSink
在文件头部生成BOM(防utf-8乱码)
新功能设计
改重命名逻辑,重启续写(去.tmp缀的文件续写)
支持提供时刻滚动(分、时、天)(必须关闭其他滚动)
支持历史数据与新生数据交叉存储,历史文件存储
新参数
timeRollerFlag时刻滚动必填DAY/HOUR/MINUTE
formerField时刻滚动必填event的head的时间字段
sdfParsePattern isNotBlank(formerField)时刻滚动必填解析时间值的Pattern

//------------------------------------------------------------------
拦截器链
BodyAppendByHeadInterceptor(全加、指定加)
bodyLoopAppendFormat二选一字符串({key}、{value}、{separator}) 例:<{key}>={value}
bodyCustomAppendFormat二选一字符串({@key}&#43;str、{separator})例:{separator}<hostName>={hostName}{separator}<filePath>={filePath}
BodyReplaceByBodyInterceptor(修改)
bodyRegex必填多取正则({separator})例:<Message>=(size:)
bodyStrs&quot;&quot;字符串({separator}、{split})例:
HeadPutByBodyInterceptor(全加、指定加)
headLoopAppendRegex二选一双取正则 例:<([^>]*)>=((?:(?!\r?\n)[\\W\\w])*)
headCustomAppendFields二选一字符串({split}) 例:
headCustomAppendRegexs&quot;&quot;正则({split}) 例:
HeadPutByHeadInterceptor(修改)
headKeys必填字符串({split}) 例:startTimeZ{split}closeTimeZ{split}params_length
headValues&quot;&quot;字符串({split}、{@key}&#43;str、{@key.length}) 例:{StartTime}&#43;08:00{split}{CloseTime}&#43;08:00{split}{Params.length}

//------------------------------------------------------------------
页: [1]
查看完整版本: Flume-1.6.0修改