设为首页 收藏本站
查看: 1742|回复: 0

[经验分享] 【Spark七十五】Spark Streaming整合Flume-NG三之接入log4j

[复制链接]

尚未签到

发表于 2017-5-22 06:18:19 | 显示全部楼层 |阅读模式
  先来一段废话:
  实际工作中,业务系统的日志基本上是使用Log4j写入到日志文件中的,问题的关键之处在于业务日志的格式混乱,这给对日志文件中的日志进行统计分析带来了极大的困难,或者说,基本上无法进行分析,每个人写日志的习惯不同,导致日志行的格式五花八门,最后只能通过grep来查找特定的关键词缩小范围,但是在集群环境下,每个机器去grep一遍,分析一遍,这个效率如何可想之二,大好光阴都浪费在这上面了。
  更合理的做法时,对重要日志进行统计分析,写入关系型数据库或者NoSQL数据库,一方面将重要的日志整合到一起,同时使用这些数据库的查询能力快速的找到相关的日志。
  这就涉及到一个日志格式的问题,对于需要进行统计分析的日志,应该使用专门的logger以及appender,这里就是使用FlumeAppender,将日志发送到Flume的输入源,然后经过Channel和Sink进入处理和分析的环节中。另一方面,针对这种的日志,需要根据业务的分析目标,严格定义其结构。
  本文分析使用log4j将业务产生的日志通过FlumeAppender写到Flume的日志输入源(source),最后流出到Spark Streaming,交由Spark Streaming

1. log4j配置

###日志名称和级别
log4j.rootLogger=INFO,Flume
####未log4j定义的Flume专用Appender类
log4j.appender.Flume=org.apache.flume.clients.log4jappender.Log4jAppender
###将数据发往localhost的19999端口,此端口由Flume的一个Agent监听,该Agent接收Flume发送过来的数据
log4j.appender.Flume.Hostname= localhost
log4j.appender.Flume.Port=19999
log4j.appender.Flume.UnsafeMode=false
###输出格式
log4j.appender.Flume.layout=org.apache.log4j.PatternLayout
log4j.appender.Flume.layout.ConversionPattern=%d{ABSOLUTE} %-5p [%c] %m%
 
2. 应用程序依赖的jar
  这里的应用程序不是指Spark提交的程序,而是指的是使用log4j输出日志的业务系统,因为业务系统使用了Flume的专用Appender,因此需要把这些依赖的jar加到classpath上

avro-1.7.3.jar                 jackson-mapper-asl-1.9.3.jar      slf4j-api-1.6.1.jar
avro-ipc-1.7.3.jar             flume-ng-core-1.5.2.jar           slf4j-log4j12-1.6.1.jar
commons-collections-3.2.1.jar  flume-ng-log4jappender-1.5.2.jar  log4j-1.2.17.jar
commons-lang-2.5.jar           flume-ng-sdk-1.5.2.jar            
commons-logging-1.1.1.jar      jackson-core-asl-1.9.3.jar        netty-3.5.12.Final.jar
3. Flume配置
  Flume的配置与Spark Streaming与之前的处理一样,Flume Agent的source监听于19999端口,Spark Streaming的Worker Thread监听于9999端口,Flume Agent的sink往9999端口写入数据(或者直接写到KafkaSink,Spark Streaming从Kafka读取数据),
  通过上面的配置可以看出来,Flume的各个组件时独立的,可以任意的搭配,使用Flume的Log4j Appender仅仅改变了Flume获取数据源的方式,获取到数据后,之前的操作都是一样的

问题:
  Flume的source使用avro的方式从19999获取数据,而数据是通过Log4j Appender写入到19999端口的,之前是使用avro client的方式将数据写入到19999端口的,Log4jAppender输入的数据格式和avro client输入的数据一样吗?即两种方式写入到19999端口,能否被Flume source所识别。从上面可以看到Flume的Log4jAppender依赖于avro和avro ipc库,因此有理由相信,Flume的Log4jAppender也是采用类似avro-client的方式,以avro方式将数据进行包装后写到19999中的。实验验证也确实如此

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 19999

a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 9999

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 4. 验证
  写一个java程序, 定时的写日志,然后Flume的Log4j Appender将数据发送到19999端口,作为Flume的输入源,Flume通过sink将数据写到9999端口,这正是Spark Streaming监听的端口,Spark Streaming读取到数据后,即可进行分析
  4.1 Java代码

package com.tom.flume.log4j.Example;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class FlumeLog4j {
private static Log LOG = LogFactory.getLog(FlumeLog4j.class);
public static void main(String[] args) {
int loop = 60;
int interval = 1000;
if (args != null && args.length > 0) {
interval = Integer.parseInt(args[0]);
}
if (args != null && args.length > 1) {
loop = Integer.parseInt(args[1]);
}
try {
int i = 0;
while (i++ < loop) {
System.out.println(i);
LOG.info("This is the log " + i); //Spark Streaming收到这个日志
Thread.sleep(interval); //暂停interval毫秒
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
  4.2 log4j.properties

log4j.rootLogger=INFO,Flume
log4j.appender.Flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.Flume.Hostname= localhost
log4j.appender.Flume.Port=19999
log4j.appender.Flume.UnsafeMode=false
log4j.appender.Flume.layout=org.apache.log4j.PatternLayout
log4j.appender.Flume.layout.ConversionPattern=%d{ABSOLUTE} %-5p [%c] %m%
  4.3 程序启动脚本launch.sh

java -classpath ".:./*" com.tom.flume.log4j.Example.FlumeLog4j
  将log4j.properties以及前面提到的14个jar以及FlumeLog4j这个类所打成的jar包放到launch.sh的同一个目录下

5.运行
  5.1 启动Spark Streaming,监听于9999
  5.2 启动Flume Agent a1,监听于19999等待数据输入作为数据源
  5.3 通过launch.sh启动java程序,想19999端口写入数据
  5.4 Flume接收到来自19999端口的写入数据后,通过sink写向9999,Spark Streaming接收到数据,注意Spark Streaming接收的数据格式为

02:01:49,255 INFO  [com.tom.flume.log4j.Example.FlumeLog4j] This is the log 34
  可见Log4j根据appender的PatternLayout加了一些前缀,需要根据需要决定是否需要这个,需要的话就需要额外的解析工作。



 

 

 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379677-1-1.html 上篇帖子: flume之退避算法backoff algorithm 下篇帖子: 【Spark七十四】Spark Streaming整合Flume-NG二
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表