小乔 发表于 2019-1-28 11:29:48

elk组件 基础语法

Shipper->Broker->Indexer->ES
1.input
input { stdin {} }
output {
   stdout { codec=> rubydebug }
}
file {
   codec => multiline {
pattern => "^\s"
what => "previous"
}
   path => ["xx","xx"]
   exclude => "1.log"
   add_field => [ "log_ip", "xx" ]
   tags => "tag1"
   #设置新事件的标志
   delimiter => "\n"
   #设置多长时间扫描目录,发现新文件
   discover_interval => 15
   #设置多长时间检测文件是否修改
   stat_interval => 1
   #监听文件的起始位置,默认是end
   start_position => beginning
   #监听文件读取信息记录的位置
   sincedb_path => "E:/software/logstash-1.5.4/logstash-1.5.4/test.txt"
   #设置多长时间会写入读取的位置信息
   sincedb_write_interval => 15
   }

2.filter
filter {
    multiline {
      # 指定合并规则——所有不是以数字开头的行需要被合并
      pattern => "^[^\d]"
      # 合并到哪里——上一行
      what => "previous"
    }
filter {
multiline {
    type => "type"   #类型,不多说
    pattern => "pattern, a regexp" #参数,也可以认为是字符,有点像grep ,如果符合什么字符就交给下面的 what 去处理
    negate => boolean
    what => "previous" or "next" #这个是符合上面 pattern 的要求后具体怎么处理,处理方法有两种,合并到上面一条日志或者下面的日志
}
}

filter {
grep {
    match => [ "@message", "PHP Fatal error" ]
    drop=> false
    add_tag =>
       }
       grep {
       tags =>
       match => [ "@message", ".*(xbox\.com|xbox\.mib\.com\.cn|supports\.game\.mib\.com\.cn)" ]
       drop=> false
       add_tag =>
            }
         }
#过滤掉内容包含5.3.3与down以外日志
filter {
    if !~"5.3.3|down" {
      ruby{
            code => "event.cancel"
    }
    }
}
#使用自带的过滤规则显示更多的字段
filter {
    grok {
      match => {"message" => "%{COMBINEDAPACHELOG}"}
}
}
#合并不是以[开头的日志
filter {
    multiline {
      pattern => "^[^[]"
      negate => true
      what => "previous"
    }
}

filter {
if =~ "error" {
    mutate { replace => { "type" => "apache_error" } }
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
}
date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}

filter {
if =~ "access" {
    mutate { replace => { type => "apache_access" } }
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    date {
      match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
} else if =~ "error" {
    mutate { replace => { type => "apache_error" } }
} else {
    mutate { replace => { type => "random_logs" } }
}
}

3.output
发邮件
output {
email {
   match => [ "@message", "aaaaa" ]
   to => "storyskya@gmail.com"
   from => "monitor@mib.com.cn"
   options => [ "smtpIporHost", "smtp.mibnet.com",
                "port", "25",
                "userName", "monitor@mib.com.cn",
                "starttls", "true",
                "password", "opmonitor",
                "authenticationType", "login"
            ]
   subject => "123"
   body => '123'
   via => smtp
}
}
   
output {
    if == "syslog" {
      elasticsearch {
            hosts => "172.16.0.102:9200"
            index => "syslog-%{+YYYY.MM.dd}"
    }
}
    if == "nginx" {
      elasticsearch {
            hosts => "172.16.0.102:9200"
            index => "nglog-%{+YYYY.MM.dd}"
   }
}
#匹配内容包含paramiko与simplejson的日志通邮件发送
    if =~/paramiko|simplejson/ {
      email {
            to => "12222222@wo.cn"
            from => "good_zabbix@163.com"
            contenttype => "text/plain; charset=UTF-8"
            address => "smtp.163.com"
            username => "test@163.com"
            password => "12344"
            subject => "服务器%{host}日志异常"
            body => "%{@timestamp} %{type}: %{message}"
      }
    }
}
   
output {
    stdout { codec => rubydebug }
    redis {
      host => '192.168.1.104'
      data_type => 'list'
      key => 'logstash:redis'
    }
}

output {
elasticsearch { host => localhost }
stdout { codec => rubydebug }
}
   
替换
mutate {
    type => "phplog"
    gsub => [ "@message","'", "\"" ]
}   

调试
# /usr/local/logstash-1.5.2/bin/logstash -e 'input { stdin { } } output { stdout {} }'
curl '
logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'  # logstash agent -f logstash-simple.conf --verbose //开启debug模式
  

  

  




页: [1]
查看完整版本: elk组件 基础语法