muugua 发表于 2018-9-29 08:24:47

使用Logstash收集MySQL日志

input {  

  
file {
  
#slow log output to a file
  
    type => "mysql_slow_file"
  
    path => "/data/app_data/mysql/mysql-slow.log"
  
    sincedb_path => "/dev/null"
  
    codec => multiline{
  
      pattern => "^# User@Host:"
  
      negate => true
  
      what => previous
  
                      }
  
   }
  

  
file {
  
# slow log output to mysql.slow_log
  
    type => "mysql_slow_table"
  
    path => "/data/app_data/mysql/mysql-slow.csv*"
  
    sincedb_path => "/dev/null"
  
   }
  

  
file {
  
    type => "mysql_error"
  
    path => "/data/app_data/mysql/mysql-error.log"
  
    sincedb_path => "/dev/null"
  

  
   }
  

  
      }
  

  
filter {
  

  
if == "mysql_slow_file" {
  
   grok {
  
             match => [ "message", "^# User@Host: %{USER:mysql.user}(?:\[[^\]]+\])?\s+@\s+%{HOST:mysql.host}?\s+\[%{IP:mysql.ip}?\]" ]
  
          }
  
   grok {
  
            match => [ "message", "^# Query_time: %{NUMBER:mysql.query_time:float}\s+Lock_time: %{NUMBER:mysql.lock_time:float} Rows_sent: %{NUMBER:mysql.rows_sent:int} \s*Rows_examined: %{NUMBER:mysql.rows_examined:int}"]
  
          }
  
   grok {
  
            match => [ "message", "^SET timestamp=%{NUMBER:timestamp};" ]
  
         }
  
   date {
  
            match => [ "timestamp", "UNIX" ]
  
          }
  
   mutate {
  
            remove_field => "timestamp"
  
            }
  
                           }
  

  
if == "mysql_slow_table" {
  
      mutate { gsub => [ "message", '\\"', '""' ] }
  

  
      csv {
  
      columns => [ "start_time", "user_host", "query_time", "lock_time",
  
               "rows_sent", "rows_examined", "db", "last_insert_id",
  
               "insert_id", "server_id", "sql_text" ]
  
          }
  

  
      mutate { convert => [ "rows_sent", "integer" ] }
  
      mutate { convert => [ "rows_examined", "integer" ] }
  
      mutate { convert => [ "last_insert_id", "integer" ] }
  
      mutate { convert => [ "insert_id", "integer" ] }
  
      mutate { convert => [ "server_id", "integer" ] }
  

  
      date {
  
         match => [ "start_time", "YYYY-MM-DD HH:mm:ss" ]
  
         remove_field => [ "start_time" ]
  
         }
  

  
      ruby { code => "event['query_time'] = event['query_time'] ? event['query_time'].split(':').inject(0){|a, m| a = a * 60 + m.to_i} : 0" }
  

  
      ruby { code => "event['lock_time'] = event['lock_time'] ? event['lock_time'].split(':').inject(0){|a, m| a = a * 60 + m.to_i} : 0" }
  
      mutate { remove_field => [ "message" ] }
  

  
                              }
  

  
if == "mysql_error" {
  
    mutate {
  
            add_tag => ["zabbix-sender"]
  
            add_field => [
  
                "zabbix_host","%{host}",
  
                "zabbix_item","mysql.error"
  
#                "send_field","%{message}"
  
                           ]
  
                                     }
  

  

  
                           }
  
}
  

  
output {
  
   stdout {
  
   codec => "rubydebug"
  
          }
  

  
   zabbix {
  
    tags => "zabbix-sender"
  
    host => "xxxxxxx"
  
    port => "10051"
  
    zabbix_sender => "/usr/local/zabbix/bin/zabbix_sender"
  
          }
  

  
   redis {
  
   host => "xxxxxxx"
  
   data_type => "list"
  
   key => "logstash"
  
         }
  
       }


页: [1]
查看完整版本: 使用Logstash收集MySQL日志