设为首页 收藏本站
查看: 2042|回复: 0

[经验分享] (3)flume 单节点写入HDFS练习 以及 自定义拦截器 进行formatLog

[复制链接]

尚未签到

发表于 2015-11-28 15:55:51 | 显示全部楼层 |阅读模式
  (1)参考http://my.oschina.net/leejun2005/blog/288136#OSC_h2_10
  (2)flume 用hdfs sink的时候需要用到hadoop的相关jar包。 使用cdh版本的会自带相关的jar包
  (3)flume_directHDFS2.conf
  

# Firstly, now that we've defined all of our components, tell agent1 which ones we want to activate.
agent1.sources = exec-source1
agent1.channels = ch1
agent1.sinks = log-sink1

##define -- Exec Source
#type       The component type name, needs to be exec  (required)
#shell      A shell invocation used to run the command
#command    The command to execute  (required)
#channels   (required)
agent1.sources.exec-source1.type = exec
agent1.sources.exec-source1.shell = /bin/bash -c
agent1.sources.exec-source1.command = tail -n +0 -F /usr/local/nginx/logs/vdnlog_access.log
agent1.sources.exec-source1.channels = ch1

##define -- Memory Channel called ch1 on agent1
#typeThe component type name, needs to be memory (required)
#capacityThe maximum number of events stored in the channel
#transactionCapacityThe maximum number of events the channel will take from a source or give to a sink per transaction
#keep-aliveTimeout in seconds for adding or removing an event
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 100000
agent1.channels.ch1.transactionCapacity = 100000
agent1.channels.ch1.keep-alive = 30
# Define -- Hdfs Sink
#typeThe component type name, needs to be hdfs  (required)
#channel(required)
#hdfs.pathHDFS directory path (eg hdfs://namenode/flume/webdata/) (required)
#hdfs.writeFormat       Format for sequence file records. One of “Text” or “Writable” (the default).
#hdfs.fileTypeFile format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC
#hdfs.filePrefixName prefixed to files created by Flume in hdfs directory
#hdfs.fileSuffixSuffix to append to file (eg .avro - NOTE: period is not automatically added)
#hdfs.roundShould the timestamp be rounded down
#hdfs.roundValueRounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
#
#按照10分钟滚动一次  这三个参数 设置为0 不然不起作用。
#agent1.sinks.log-sink1.hdfs.rollInterval= 0
#agent1.sinks.log-sink1.hdfs.rollSize = 0
#agent1.sinks.log-sink1.hdfs.rollCount = 0
#
#此时日志带.tmp
#idleTimeout=5  Timeout after which inactive files get closed
###################
agent1.sinks.log-sink1.type = hdfs
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.hdfs.path = hdfs://101.240.151.41:9000/test/pjm/%y-%m-%d
agent1.sinks.log-sink1.hdfs.writeFormat = Text
agent1.sinks.log-sink1.hdfs.fileType = DataStream
agent1.sinks.log-sink1.hdfs.filePrefix = flume_%y-%m-%d_%H%M%S
agent1.sinks.log-sink1.hdfs.fileSuffix = .log
agent1.sinks.log-sink1.hdfs.round = true
agent1.sinks.log-sink1.hdfs.roundValue = 10
agent1.sinks.log-sink1.hdfs.roundUnit = minute
agent1.sinks.log-sink1.hdfs.rollInterval= 0
agent1.sinks.log-sink1.hdfs.rollSize = 0
agent1.sinks.log-sink1.hdfs.rollCount = 0
agent1.sinks.log-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.log-sink1.hdfs.callTimeout = 20000
agent1.sinks.log-sink1.hdfs.idleTimeout=5
  
  (此处按照时间滚动文件   10分钟一个文件 )
  bin/flume-ng agent --conf conf --conf-file ./conf/flume_directHDFS2.conf --name agent1 -Dflume.root.logger=INFO,console
  ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  //自定义拦截器
  

# Firstly, now that we've defined all of our components, tell agent1 which ones we want to activate.
agent.sources = exec-source1
agent.channels = memchannellv memchannelerr memchannelbf memchannelfs memchannelother
agent.sinks = hdfssinklv hdfssinkerr hdfssinkbf hdfssinkfs hdfssinkother

##define -- Exec Source
#type       The component type name, needs to be exec  (required)
#shell      A shell invocation used to run the command
#command    The command to execute  (required)
#channels   (required)
agent.sources.exec-source1.type = exec
agent.sources.exec-source1.shell = /bin/bash -c
agent.sources.exec-source1.command = tail -F /usr/local/nginx/logs/vdnlog_access.log
agent.sources.exec-source1.interceptors = timestamp nginxlogformat
agent.sources.exec-source1.interceptors.nginxlogformat.type = com.cntv.bigdata.flume.interceptor.NginxInterceptor$Builder
agent.sources.exec-source1.interceptors.timestamp.type = timestamp

##sources selector
agent.sources.exec-source1.selector.type = multiplexing
agent.sources.exec-source1.selector.header = type
agent.sources.exec-source1.selector.mapping.lv = memchannellv
agent.sources.exec-source1.selector.mapping.err = memchannelerr
agent.sources.exec-source1.selector.mapping.bf = memchannelbf
agent.sources.exec-source1.selector.mapping.fs = memchannelfs
agent.sources.exec-source1.selector.default = memchannelother
agent.sources.exec-source1.channels = memchannellv memchannelerr memchannelbf memchannelfs memchannelother

##define -- Memory Channel called ch1 on agent1
#typeThe component type name, needs to be memory (required)
#capacityThe maximum number of events stored in the channel
#transactionCapacityThe maximum number of events the channel will take from a source or give to a sink per transaction
#keep-aliveTimeout in seconds for adding or removing an event

agent.channels.memchannellv.type = memory
agent.channels.memchannellv.capacity = 10000
agent.channels.memchannellv.transactionCapacity = 10000
agent.channels.memchannellv.keep-alive = 3
agent.channels.memchannelerr.type = memory
agent.channels.memchannelerr.capacity = 10000
agent.channels.memchannelerr.transactionCapacity = 10000
agent.channels.memchannelerr.keep-alive = 3

agent.channels.memchannelbf.type = memory
agent.channels.memchannelbf.capacity = 10000
agent.channels.memchannelbf.transactionCapacity = 10000
agent.channels.memchannelbf.keep-alive = 3
agent.channels.memchannelfs.type = memory
agent.channels.memchannelfs.capacity = 10000
agent.channels.memchannelfs.transactionCapacity = 10000
agent.channels.memchannelfs.keep-alive = 3

agent.channels.memchannelother.type = memory
agent.channels.memchannelother.capacity = 10000
agent.channels.memchannelother.transactionCapacity = 10000
agent.channels.memchannelother.keep-alive = 3

# Define -- Hdfs Sink
#typeThe component type name, needs to be hdfs  (required)
#channel(required)
#hdfs.pathHDFS directory path (eg hdfs://namenode/flume/webdata/) (required)
#hdfs.writeFormat       Format for sequence file records. One of “Text” or “Writable” (the default).
#hdfs.fileTypeFile format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC
#hdfs.filePrefixName prefixed to files created by Flume in hdfs directory
#hdfs.fileSuffixSuffix to append to file (eg .avro - NOTE: period is not automatically added)
#hdfs.roundShould the timestamp be rounded down
#hdfs.roundValueRounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
#
#按照10分钟滚动一次  这三个参数 设置为0 不然不起作用。
#agent1.sinks.log-sink1.hdfs.rollInterval= 0
#agent1.sinks.log-sink1.hdfs.rollSize = 0
#agent1.sinks.log-sink1.hdfs.rollCount = 0
#
#
####################
#######lv
agent.sinks.hdfssinklv.type = hdfs
agent.sinks.hdfssinklv.hdfs.fileType = DataStream
agent.sinks.hdfssinklv.hdfs.idleTimeout = 60
agent.sinks.hdfssinklv.hdfs.round = true
agent.sinks.hdfssinklv.hdfs.roundValue = 10
agent.sinks.hdfssinklv.hdfs.roundUnit = minute
agent.sinks.hdfssinklv.hdfs.rollInterval = 0
agent.sinks.hdfssinklv.hdfs.rollSize = 0
agent.sinks.hdfssinklv.hdfs.rollCount = 0
agent.sinks.hdfssinklv.hdfs.path = hdfs://10.240.15.4:9000/test/pjm/xxoo/lv/%y-%m-%d
agent.sinks.hdfssinklv.hdfs.filePrefix = flume_bjxd02Lv_%y-%m-%d_%H%M%S
agent.sinks.hdfssinklv.hdfs.fileSuffix = .log
agent.sinks.hdfssinklv.channel = memchannellv
#######err
agent.sinks.hdfssinkerr.type = hdfs
agent.sinks.hdfssinkerr.hdfs.fileType = DataStream
agent.sinks.hdfssinkerr.hdfs.idleTimeout = 60
agent.sinks.hdfssinkerr.hdfs.round = true
agent.sinks.hdfssinkerr.hdfs.roundValue = 10
agent.sinks.hdfssinkerr.hdfs.roundUnit = minute
agent.sinks.hdfssinkerr.hdfs.rollInterval = 0
agent.sinks.hdfssinkerr.hdfs.rollSize = 0
agent.sinks.hdfssinkerr.hdfs.rollCount = 0
agent.sinks.hdfssinkerr.hdfs.path = hdfs://101.240.151.41:9000/test/pjm/xxoo/err/%y-%m-%d
agent.sinks.hdfssinkerr.hdfs.filePrefix = flume_bjxd02Err_%y-%m-%d_%H%M%S
agent.sinks.hdfssinkerr.hdfs.fileSuffix = .log
agent.sinks.hdfssinkerr.channel = memchannelerr
#######bf
agent.sinks.hdfssinkbf.type = hdfs
agent.sinks.hdfssinkbf.hdfs.fileType = DataStream
agent.sinks.hdfssinkbf.hdfs.idleTimeout = 60
agent.sinks.hdfssinkbf.hdfs.round = true
agent.sinks.hdfssinkbf.hdfs.roundValue = 10
agent.sinks.hdfssinkbf.hdfs.roundUnit = minute
agent.sinks.hdfssinkbf.hdfs.rollInterval = 0
agent.sinks.hdfssinkbf.hdfs.rollSize = 0
agent.sinks.hdfssinkbf.hdfs.rollCount = 0
agent.sinks.hdfssinkbf.hdfs.path = hdfs://101.240.151.41:9000/test/pjm/xxoo/bf/%y-%m-%d
agent.sinks.hdfssinkbf.hdfs.filePrefix = flume_bjxd02Bf_%y-%m-%d_%H%M%S
agent.sinks.hdfssinkbf.hdfs.fileSuffix = .log
agent.sinks.hdfssinkbf.channel = memchannelbf

#######fs
agent.sinks.hdfssinkfs.type = hdfs
agent.sinks.hdfssinkfs.hdfs.fileType = DataStream
agent.sinks.hdfssinkfs.hdfs.idleTimeout = 60
agent.sinks.hdfssinkfs.hdfs.round = true
agent.sinks.hdfssinkfs.hdfs.roundValue = 10
agent.sinks.hdfssinkfs.hdfs.roundUnit = minute
agent.sinks.hdfssinkfs.hdfs.rollInterval = 0
agent.sinks.hdfssinkfs.hdfs.rollSize = 0
agent.sinks.hdfssinkfs.hdfs.rollCount = 0
agent.sinks.hdfssinkfs.hdfs.path = hdfs://101.240.151.41:9000/test/pjm/xxoo/fs/%y-%m-%d
agent.sinks.hdfssinkfs.hdfs.filePrefix = flume_bjxd02Fs_%y-%m-%d_%H%M%S
agent.sinks.hdfssinkfs.hdfs.fileSuffix = .log
agent.sinks.hdfssinkfs.channel = memchannelfs
#######other
agent.sinks.hdfssinkother.type = hdfs
agent.sinks.hdfssinkother.hdfs.fileType = DataStream
agent.sinks.hdfssinkother.hdfs.idleTimeout = 60
agent.sinks.hdfssinkother.hdfs.round = true
agent.sinks.hdfssinkother.hdfs.roundValue = 10
agent.sinks.hdfssinkother.hdfs.roundUnit = minute
agent.sinks.hdfssinkother.hdfs.rollInterval = 0
agent.sinks.hdfssinkother.hdfs.rollSize = 0
agent.sinks.hdfssinkother.hdfs.rollCount = 0
agent.sinks.hdfssinkother.hdfs.path = hdfs://101.2401.151.4:9000/test/pjm/xxoo/other/%y-%m-%d
agent.sinks.hdfssinkother.hdfs.filePrefix = flume_bjxd02Other_%y-%m-%d_%H%M%S
agent.sinks.hdfssinkother.hdfs.fileSuffix = .log
agent.sinks.hdfssinkother.channel = memchannelother


xxxx@localhost flume]$ ./bin/flume-ng agent --conf conf --conf-file ./conf/flume_directHDFS3.properties --name agent -Dflume.root.logger=DEBUG,console,LOGFILE
  
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144405-1-1.html 上篇帖子: flume写入hadoop hdfs报错 Too many open files 下篇帖子: spark-streaming连接flume时报错org.jboss.netty.channel.ChannelException: Failed to bin
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表