websyhg 发表于 2017-1-14 07:53:51

改造apache的开源日志项目来实现 分布式日志收集系统

  概述:
  在分布式系统中,经常需要采集各个节点的日志,然后统一分析。
  本文提供一种简单的方案,本文采用开源日志项目 + 统一数据库结构的方式,在各个开发环境中,提供统一的配置及调用方法,所有的日志均记录在日志服务器中,可以追踪查询任意一个系统节点上任意应用的任意线程的运行状况。
  考究现在比较流行的apache的开源日志项目log4j以及它在其他平台的衍生产物(log4net log4py等)。其由appender模块向不同目标输出日志。
  比如log4j 中使用jdbcAppender可以基本实现插入数据库的功能。log4j可以提供如下数据:
  日志信息,
  日志级别,
  时间,
  线程名,
  文件路径,
  类路径,
  行号,
  方法名
  这些数据在单个客户端模块已经可以很好的定位日志发生的各种需要信息。但是对于我们的分布式日志收集,还缺少 机器的定位。以及应用、线程的定位。
  比如在同一个机器上跑着两个相同的应用,在同处同时往日志系统中记录,我们在现有的lo4j基础上是区分不出来的。
  可能有人说,用log4j的NDC——NDC也不行,其的标识是字符串,而且用NDC的话会加大应用使用的复杂度。
  那么我们如何定位 机器、应用、线程 呢?对应三者我们可以调用系统函数获得 hostname、processId(进程号)、threadId(线程号)
  而在log4j中这三者是无法通过配置获取到的,如何解决呢?
  1。 然后很简单的想法,我们可以封装其logger的 debug、info、warn等接口来实现。——这有个很操蛋的地方,在你封装了之后,log4j记录日志时拿到的location information(日志发起的时间、线程等)就变成咱们自己封装的位置了,所以行不通。
  2。接下来我们自然的想到改写appender,比如我们改写jdbcAppender的写拼SQL语句的方法,将我们需要的三个变量给加进去。但是会发现这个问题:log4j写日志是由一个任务队列线程来控制的,在这个队列的位置获取hostname和processId可能是正确的,但是获取线程ID肯定是错误的。。所以也行不通。
  3。所以很无奈,我们只能修改log4j的源代码。我们会发现logger类所有的记录日志方法,在通过许可校验后,都会调用一个forceLog的函数(貌似叫这个名),里头new了一个LogingEvent的对象,我们修改该构造函数,在此处将PID及线程ID传进去,然后继承jdbcAppender,重写拼SQL语句的函数,搞定。
  同理,在log4net中也按照如此修改即可(架构基本相似)。
  这样我们就获得了自己的日志jar包和dll,引用之后,可以按照原来的配置方法,以及完全兼容log4j 和log4net的方法记录日志。同时提供我们自己私有的appender,可以作为我们分布式日志收集系统的调用入口。
  扩充话题:
  1。 关于日志库规模太大了检索效率十分低下怎么处理?
  方案1, 可以使用lucene,从日志库中提取信息,专用于检索。
  方案2, 定期清理、备份数据库。
  2。 此方案适用的情景
  此方案适用于中等规模(如1000个并发应用以下节点),并且节点间具有稳定、快速的通信信道(如局域网),对实时性要求、记录完整型要求不是十分苛刻的系统。
  3。 为何不用socket相关的appender
  经了解,log4j及其各个衍生品居然在tcp应用层协议上不统一,遂不用。
  4。 版权许可?
  apache许可,适合商用,如果发布你修改的代码,需要做如下事情:
  1。发布时需要给用户一份apache许可。
  2。需要在原代码文件里注明你修改了哪。
  3。发布需要带有原工程的协议、商标、专利声明以及其中一切要求衍生软件/类库声明的内容。
  4。发布需要包含一个Notice文件,里面包含Apache许可证及你自己要求声明的内容。(该内容不可与apache协议本身冲突)
  相关代码及演示:
  log4j 配置
  log4j.rootLogger=DEBUG, DATABASE
  log4j.appender.DATABASE=com.netvideo.log.NvJdbcAppender
  log4j.appender.DATABASE.layout=org.apache.log4j.PatternLayout
  log4j.appender.DATABASE.URL=jdbc:mysql://192.168.25.156:3306/test?useUnicode=true&characterEncoding=utf-8
  log4j.appender.DATABASE.driver=com.mysql.jdbc.Driver
  log4j.appender.DATABASE.user=XXX
  log4j.appender.DATABASE.password=XXXX
  log4j.appender.DATABASE.tableName=log
  log4j.appender.DATABASE.consolePrint=true
  使用方法(和log4j完全一致):
  Logger logger = Logger.getLogger(Test.class);
  logger.info("info called");
  NvJdbcAppender源码
  (有个小插曲,jdbcAppender是有漏洞的。。message里头无法带单引号,不然会造成SQL拼写错误,我在此处修正了这个问题)
  package com.netvideo.log;import org.apache.log4j.jdbc.JDBCAppender;import java.net.InetAddress;import java.sql.SQLException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Iterator;import org.apache.log4j.spi.ErrorCode;import org.apache.log4j.spi.LoggingEvent;/*** NetVideo通用日志模块* @author chenggong* @version 0.1*/public class NvJdbcAppender extends JDBCAppender {private String hostName = null; //主机名private String tableName; //日志表名private boolean consolePrint = false;//是否在控制台打印public NvJdbcAppender(){super();this.setLocationInfo(true);try{InetAddress ia = InetAddress.getLocalHost();hostName = ia.getHostName();}catch(Exception e){hostName = "获取异常";}}protected String sqlSafe(String s){return s.replace("'", "''");}protected String getCurrentTime(){return new String(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));}protected String getLogStatement(LoggingEvent logEvent){String rst = String.format("INSERT INTO %s VALUES(null,'%s','%s','%s','%s',%s,'%s','%s',%d,%d,'%s','%s')",this.tableName,logEvent.getLevel(),logEvent.getLocationInformation().getClassName(),logEvent.getLocationInformation().getFileName(),logEvent.getLocationInformation().getMethodName(),logEvent.getLocationInformation().getLineNumber(),this.sqlSafe(this.hostName),this.sqlSafe(logEvent.getMessage().toString()),logEvent.getProcessId(),logEvent.getThreadId(),this.sqlSafe(logEvent.getThreadName()),this.getCurrentTime()//待讨论);return rst;}protected String getConsoleOut(LoggingEvent logEvent){String rst = String.format("[%s][%s]:%s",this.getCurrentTime(),logEvent.getLocationInformation().fullInfo,logEvent.getMessage());return rst;}@SuppressWarnings("unchecked")public void flushBuffer() {removes.ensureCapacity(buffer.size());for (Iterator i = buffer.iterator(); i.hasNext();) {try {LoggingEvent logEvent = (LoggingEvent) i.next();String sql = getLogStatement(logEvent);if (!sql.equals("")) {execute(sql);}if (consolePrint){System.out.println(this.getConsoleOut(logEvent));}removes.add(logEvent);} catch (SQLException e) {errorHandler.error("Failed to excute sql", e,ErrorCode.FLUSH_FAILURE);}}// remove from the buffer any events that were reportedbuffer.removeAll(removes);// clear the buffer of reported eventsremoves.clear();}public void setTableName(String tableName) {this.tableName = tableName;}public void setConsolePrint(String isPrint){consolePrint = (isPrint.toLowerCase().trim().equals("true"));}}
  log4net配置:
<log4net debug="false"><root><level value="DEBUG" /><appender-ref ref="DB" /></root><appender name="DB" type="netvideo.Log.NetVideoAppender"><bufferSize value="100"/><param name="ConnectionType" value="MySql.Data.MySqlClient.MySqlConnection, MySql.Data"/><param name="ConnectionString" value="database=test;server=192.168.25.156;uid=root;password=123456;old syntax=yes"/></appender></log4net>

使用方法:


log4net.ILog logger = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);

logger.Debug("test");


NetVideoAppender代码:

using System;using System.Collections;using System.Data;using System.IO;using System.Reflection;using System.Text;using log4net.Appender;using log4net.Util;using log4net.Layout;using log4net.Core;namespace netvideo.Log{/// <summary>/// 网络视频 日志APPENDER/// </summary>public class NetVideoAppender : AdoNetAppender{public NetVideoAppender(){//Console.WriteLine("NetVideoAppender created");}private string SqlSafe(string s){return s.Replace("'", "''").Replace("//","////");}protected void FormatSql(IDbCommand cmd, LoggingEvent e){StringBuilder sb = new StringBuilder();sb.AppendFormat("INSERT INTO log VALUES(null,'{0}','{1}','{2}','{3}',{4},'{5}','{6}',{7},{8},'{9}','{10}')",e.Level,e.LocationInformation.ClassName,this.SqlSafe(e.LocationInformation.FileName),e.LocationInformation.MethodName,e.LocationInformation.LineNumber,this.SqlSafe(e.UserName),this.SqlSafe(e.MessageObject.ToString()),e.processId,e.threadId,this.SqlSafe(e.ThreadName),e.TimeStamp);cmd.CommandText = sb.ToString();}override protected void SendBuffer(IDbTransaction dbTran, LoggingEvent[] events){if (m_usePreparedCommand){// Send buffer using the prepared command objectif (m_dbCommand != null){if (dbTran != null){m_dbCommand.Transaction = dbTran;}// run for all eventsforeach (LoggingEvent e in events){FormatSql(m_dbCommand, e);// Execute the querym_dbCommand.ExecuteNonQuery();}}}else{// create a new commandusing (IDbCommand dbCmd = m_dbConnection.CreateCommand()){if (dbTran != null){dbCmd.Transaction = dbTran;}// run for all eventsforeach (LoggingEvent e in events){FormatSql(dbCmd, e);dbCmd.ExecuteNonQuery();}}}}}}
页: [1]
查看完整版本: 改造apache的开源日志项目来实现 分布式日志收集系统