设为首页 收藏本站
查看: 1374|回复: 0

[经验分享] 日志系统之扩展Flume-LineDeserializer

[复制链接]

尚未签到

发表于 2015-11-27 17:53:55 | 显示全部楼层 |阅读模式
  本人博客文章如未特别注明皆为原创!如有转载请注明出处:http://blog.csdn.net/yanghua_kobe/article/details/46595401

  继续闲聊日志系统,在之前的博文里已提到我们在日志收集上的选择是flume-ng。应用程序将日志打到各自的日志文件或指定的文件夹(日志文件按天滚动),然后利用flume的agent去日志文件中收集。
Deserializer简介
  flume将一条日志抽象成一个event。这里我们从日志文件中收集日志采用的是定制版的SpoolDirectorySource(我们对当日日志文件追加写入收集提供了支持)。从日志源中将每条日志转换成event需要Deserializer(反序列化器)。flume的每一个source对应的deserializer必须实现接口EventDeserializer,该接口定义了readEvent/readEvents方法从各种日志源读取Event。

  flume主要支持两种反序列化器:

  (1)AvroEventDeserializer:解析Avro容器文件的反序列化器。对Avro文件的每条记录生成一个flume Event,并将基于avro编码的二进制记录存入event body中。
  (2)LineDeserializer:它是基于日志文件的反序列化器,以“\n”行结束符将每行区分为一条日志记录。
LineDeserializer的缺陷
  大部分情况下SpoolDictionarySource配合LineDeserializer工作起来都没问题。但当日志记录本身被分割成多行时,比如异常日志的堆栈或日志中包含“\n”换行符时,问题就来了:原先的按行界定日志记录的方式不能满足这种要求。形如这样的格式:

  
[2015-06-22 13:14:28,780] [ERROR] [sysName] [subSys or component] [Thread-9] [com.messagebus.client.handler.common.CommonLoopHandler] -*- stacktrace -*- : com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
at com.rabbitmq.client.QueueingConsumer.handle(QueueingConsumer.java:203)
at com.rabbitmq.client.QueueingConsumer.nextDelivery(QueueingConsumer.java:220)
at com.messagebus.client.handler.common.CommonLoopHandler.handle(CommonLoopHandler.java:34)
at com.messagebus.client.handler.consume.ConsumerDispatchHandler.handle(ConsumerDispatchHandler.java:17)
at com.messagebus.client.handler.MessageCarryHandlerChain.handle(MessageCarryHandlerChain.java:72)
at com.messagebus.client.handler.consume.RealConsumer.handle(RealConsumer.java:44)
at com.messagebus.client.handler.MessageCarryHandlerChain.handle(MessageCarryHandlerChain.java:72)
at com.messagebus.client.handler.consume.ConsumerTagGenerator.handle(ConsumerTagGenerator.java:22)
at com.messagebus.client.handler.MessageCarryHandlerChain.handle(MessageCarryHandlerChain.java:72)
at com.messagebus.client.handler.consume.ConsumePermission.handle(ConsumePermission.java:37)
at com.messagebus.client.handler.MessageCarryHandlerChain.handle(MessageCarryHandlerChain.java:72)
at com.messagebus.client.handler.consume.ConsumeParamValidator.handle(ConsumeParamValidator.java:17)
at com.messagebus.client.handler.MessageCarryHandlerChain.handle(MessageCarryHandlerChain.java:72)
at com.messagebus.client.carry.GenericConsumer.run(GenericConsumer.java:50)
at java.lang.Thread.run(Thread.java:744)
Caused by: com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
当然你也可以对日志内容进行特殊处理,让一条日志的所有内容以一行输出,但这样需要对日志框架进行定制,有时这并不受你控制。因此这里最好的选择是定制日志收集器。
  
源码问题定位
  我们先来了解一下Flume源码中LineDeserializer的核心实现:

  
  private String readLine() throws IOException {
StringBuilder sb = new StringBuilder();
int c;
int readChars = 0;
while ((c = in.readChar()) != -1) {
readChars++;
// FIXME: support \r\n
if (c == '\n') {
break;
}
sb.append((char)c);
if (readChars >= maxLineLength) {
logger.warn(&quot;Line length exceeds max ({}), truncating line!&quot;,
maxLineLength);
break;
}
}
if (readChars > 0) {
return sb.toString();
} else {
return null;
}
}
首先,构建一个StringBuilder,然后以字符为单位挨个读取,如果读取到换行符“\n”,则表示读取本条日志结束,跳出循环;否则将该字符串追加到StringBuilder中。与此同时会给读取的字符个数计数:如果读取的字符个数大于预先配置的一行日志的最大字符串长度,也会跳出循环。
  
  这里的主要问题出在以换行符“\n”作为日志结尾的分隔符逻辑上。当我们记录异常日志时,我们需要重新找到一种界定日志记录结尾的方式。

解决思路
  考虑到我们采用[]作为日志的tag界定符,每条日志几乎都是以“[”打头。因此,我们采取的做法是:判断读取到换行符“\n”后再预读下一位,如果下一位是“[”,则认为这是一条普通不换行的日志,此时再回退一个字符(因为刚刚预读了一个字符,需要让指针后退回原来的位置),然后跳出循环;而如果下一位不是“[”,则认为它是一个异常日志或者多行日志。则继续往后读取字符,当遇到换行符时,再次重复以上判断。当然如果你的日志格式是以某个固定的格式打头,首字母固定的话,才可以用这种方式,否则你很可能要配置日志的apender,使其以某个特定的符号作为日志的结尾来判断了。另外,有时也可以基于正则来匹配。

定制实现
  为了提升扩展性,我们提供对预读的下一个字符进行配置,并将其命名为:newLineStartPrefix。我们新建一个反序列化类:MultiLineDeserializer。该类的大部分逻辑都跟LineDeserializer相同,主要需要重新实现上面的readLine方法,实现如下:

  
    private String readLine() throws IOException {
StringBuilder sb = new StringBuilder();
int c;
int readChars = 0;
while ((c = in.readChar()) != -1) {
readChars++;
// FIXME: support \r\n
if (c == '\n') {
//walk more one step
c = in.readChar();
if (c == -1)
break;
else if (c == this.newLineStartPrefix) {    //retreat one step
long currentPosition = in.tell();
in.seek(currentPosition - 1);
break;
}
}
sb.append((char)c);
if (readChars >= maxLineLength) {
logger.warn(&quot;Line length exceeds max ({}), truncating line!&quot;,
maxLineLength);
break;
}
}
if (readChars > 0) {
return sb.toString();
} else {
return null;
}
}
这里有个小插曲,由于之前已定制了source/sink的缘故。原以为deserializer也可以用同样的方式进行定制。并在agent的deserializer配置中指定定制过的deserializer的完全限定名。但经过验证后发现,这条路走不通,会报错(貌似从flume官网上也找不到对deserializer定制的介绍)。因此,只能在源码上进行扩展,然后编译源码,重新生成jar。
  
  从源码里你会发现为什么在第三方包内扩展deserializer是行不通的。从github上clone下源码,进入flume-ng-core module的如下类:org.apache.flume.serialization.EventDeserializerType,你就会一目了然:

  
public enum EventDeserializerType {
LINE(LineDeserializer.Builder.class),
MULTILINE(MultiLineDeserializer.Builder.class),
AVRO(AvroEventDeserializer.Builder.class),
OTHER(null);
private final Class<? extends EventDeserializer.Builder> builderClass;
EventDeserializerType(Class<? extends EventDeserializer.Builder> builderClass) {
this.builderClass = builderClass;
}
public Class<? extends EventDeserializer.Builder> getBuilderClass() {
return builderClass;
}
}
你必须显式在这里定义deserializer的枚举,然后指定其builder的Class实例,并在agent里的deserializer配置项中填写你这里的枚举名称才行。我们只需在子package:serialization中新建MultiLineDeserializer类,然后重新实现逻辑、编译、打包flume-ng-core Module生成新的jar即可。flume将其源码中的每个Module生成的jar都放在其二进制包的lib文件夹下。你只需将重新打包好的flume-ng-core jar替换原来的,重启agent即可看到效果。
  
  这里还有个需要注意的地方:LineDeserializer有一个参数(maxLineLength)用于定义一个日志行的最长字符数。如果某条日志超过这个长度,将不再读取。而一条日志占据多行情况下,该值需要适当增大,因为像异常日志的堆栈长度明显比普通日志长不少,这里你可以设置为8192。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144297-1-1.html 上篇帖子: 修改Flume Log4j Appender 下篇帖子: 《Flume 1.6.0 User Guide》基础入门
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表