xiaoyu28 发表于 2019-1-28 08:51:28

部署 elk 日志系统 elasticsearch、logstash、 kibana

  安装elk   
    安装Java 1.8环境
      解压源码安装包:

      tar xf jdk-8u121-linux-x64.tar.gz
      ll
      mkdir /work/opt -p
      mvjdk1.8.0_121/work/opt/jdk
      ll /work/opt/jdk/
      chown -R root.root/work/opt
      vim /etc/profile :    //添加
      export JAVA_HOME=/work/opt/jdk
      export JAVA_BIN=$JAVA_HOME/bin
      export JRE_HOME=${JAVA_HOME}/jre
      export CLASSPATH=$JAVA_HOME/jre/lib/ext:$JAVA_HOME/lib/tools.jar
      export PATH=$JAVA_BIN:$PATH  
    安装 elasticsearch-5.3.0

    tar xf elasticsearch-5.3.0.tar.gz
    mv elasticsearch-5.3.0 /work/opt/elasticsearch
   
    ubuntu@ip-172-31-1-79:/work/opt/elasticsearch/config$ egrep -v '#|^$' elasticsearch.yml
    cluster.name: lvnian-elk
    node.name: lvnian-elk-node1
    path.data: /data#由于是普通用户启动elasticsearch,所以这个目录的属主需要改为普通用户
    path.logs: /work/opt/elasticsearch/logs   #由于是普通用户启动elasticsearch,所以这个目录的属主需要改为普通用户
    bootstrap.memory_lock: false
    network.host: 0.0.0.0
    http.port: 9200
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    ####缓存
   index.cache.field.expire: 10m
      index.cache.field.max_size: 500000
      index.cache.field.type: soft
    ubuntu@ip-172-31-1-79:/work/opt/elasticsearch/config$
    nohup /work/opt/elasticsearch/bin/elasticsearch >> /tmp/elasticsearch.log&  ##ES 5.1.1 安装 head:
    (5.1.1版本的 elasticsearch 没有提供直接插件安装方法,但在该github上该插件作者给出了方法)

      下载二进制源码包:
      wget https://nodejs.org/dist/v4.2.2/node-v6.2.0-linux-x64.tar.gz
      解压:
      tar xf node-v6.2.0-linux-x64.tar.gz -C /work/opt/
      设置环境变量:
      vim /etc/profile:
            export NODE_HOME=/work/opt/node/
            export PATH=$PATH:$NODE_HOME/bin
      root@ip-172-31-1-79:/work/source# node --version
      v6.10.1
      npm config set registry https://registry.npm.taobao.org    //设置代理镜像源,加速下载
      cd /home/stt/node-v4.2.2-linux-x64/lib/node_modules
      npm install grunt    //显示的2条warn可以忽略  测试grunt是否生效:

      $ grunt -version
      grunt-cli v1.2.0
      grunt v1.0.1  安装head插件:

          下载:git clone git://github.com/mobz/elasticsearch-head.git
      cd /home/stt/elasticsearch-head
      npm install (提示:如果遇到网络的瓶颈,将预先下载的源码包放在对应的位置效果一样,目录为/tmp/phantomjs/,确定位置后可自行创建并上传包)
      修改 elasticsearch-head/ _site/app.js
      // 把localhost改为ip
      找到:
      this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://localhost:9200";
      改为:
      this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://192.168.8.116:9200";
      修改 elasticsearch-head/Gruntfile.js
      connect: {
            server: {
                options: {
                  hostname: "0.0.0.0",   //增加这个配置
                  port: 9100,
                  base: '.',
                  keepalive: true
                }
            }
      }  启动服务:(后台运行)
      grunt server &    //需要在 /home/stt/elasticsearch-head 下执行,因为我的 grunt 没有进行全局的安装


      
    ##安装Logstash(这个软件在你需要读取的日志服务上安装,用logstash读取你的日志,上传给elasticsearch的):
      也需要安装java1.8环境

      tar xf logstash-5.3.0.tar.gz
      mkdir /work/opt
      mv logstash-5.3.0 /work/opt/
      cd /work/opt/  vim /work/opt/logstash-5.3.0/conf/central.conf    #(处理基于 FILE 方式输入的日志信息,这里是简单的举个例子,日后继续学习补充)

      input {
            file {
                path => "/tmp/*.log"
            }
      }
      output {
            elasticsearch {
                hosts => "192.168.8.116:9200"
                index => "nginx-access"
            }
            stdout {
                codec => rubydebug
            }
      }  
   
   
    ##安装Kibana:
    解压源码包:

    tar zxf kibana-5.1.1-linux-x86_64.tar.gz -C /home/stt/server/
    vim config/kibana.yml    //修改
      server.port: 5601    //打开注释而已,不用可以去效果,请使用默认端口
      server.host: "0.0.0.0"    //打开监听地址,让别的机器也能访问这个 kibana
      elasticsearch.url: "http://127.0.0.1:9200"      //这个url要根据实质情况,添加访问 elasticsearch 的url
    启动服务: (后台运行)
    /home/stt/server/kibana-5.1.1-linux-x86_64/bin/kibana &  
    安装nginx 反向代理,
    apt-gei install nginx


    nginx放心代理配置文件如下:

##文件名kibana.conf
    upstream backend {
               server 172.31.6.155:5601;
             }
      server {
            listen       80;
            server_namekibana.lvnian.co;
      access_log /tmp/kibana-access.log;
      error_log /tmp/kibana-error.log;
            location / {
                #设置主机头和客户端真实地址,以便服务器获取客户端真实IP
               proxy_set_header Host $host;
               proxy_set_header X-Real-IP $remote_addr;
               proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
               #禁用缓存
               proxy_buffering off;
               #反向代理的地址
               proxy_pass http://backend;   
            }
      }  
      
      
      
    logstash 读取nginx的访问日志以及error日志,上传到logstash的配置文件文件。
  用下面命令运行即可

  nohup /work/opt/logstash-5.3.0/bin/logstash -f /work/opt/logstash-5.3.0/conf/elk-nginx-log.conf &


   文件名: elk-nginx-log.conf
    input {
    file {
      path => "/data/logs/nginx/*.log"
      start_position => beginning
    }
}

filter {
    if =~ "access"{
      mutate { replace => { type => "nginx_access" } }
      ruby {
            init => "@kname = ['http_x_forwarded_for','http_host','time_local','request','status','body_bytes_sent','request_body','content_length','http_referer','http_user_agent','http_cookie','remote_addr','remote_port','hostname','upstream_addr','upstream_response_time','request_time']"
            code => "
                new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split(' || '))])
                new_event.remove('@timestamp')
                event.append(new_event)
            "
      }
      if {
            ruby {
                init => "@kname = ['method','uri','verb']"
                code => "
                  new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split(' '))])
                  new_event.remove('@timestamp')
                  event.append(new_event)
                "
            }
            if {
                ruby {
                  init => "@kname = ['url_path','url_args']"
                  code => "
                        new_event = LogStash::Event.new(Hash[@kname.zip(event.get('uri').split('?'))])
                        new_event.remove('@timestamp')
                        event.append(new_event)
                  "
                }
            }
      }
      mutate {
            convert => [
                "body_bytes_sent" , "integer",
                "content_length", "integer",
                "upstream_response_time", "float",
                "request_time", "float",
                "http_x_forwarded_for", "string",
                "http_host", "string"
            ]
            remove_field => [ "message","uri","request","path","verb" ]
      }
                if == '-' or '.' notin {
                      mutate { replace => { http_x_forwarded_for => "%{remote_addr}" } } ##http_x_forwarded_for 字段为空,则把remote_addr字段的内容赋值给它
                }
      if !~ "^127\.|^192\.168\.|^172\.1\.|^172\.2\.|^172\.3\.|^10\." {   
            geoip {
                  source => "remote_addr"    #设置解析IP地址的字段
                  target => "geoip"    #将geoip数据保存到一个字段内
                  database => "/work/opt/logstash/conf/GeoLite2-City.mmdb"    #IP地址数据库
                           add_field => [ "", "%{}" ]
                            add_field => [ "", "%{}"]
                  }
                  mutate {
                            convert => [ "", "float"]
                  }   
      }
      date {
            match => [ "time_local", "dd/MMM/yyyy:hh:mm:ss Z" ]
            locale => "zh"
      }
    }
    else if =~ "error" {
      mutate { replace => { type => "nginx_error" } }
      grok {
            match => { "message" => "(?\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d) \[(?\w+)\] \S+: \*\d+ (?[^,]+), (?.*)$" }
      }
      mutate {
            rename => [ "host", "fromhost" ]
            gsub => [ "errmsg", "too large body: \d+ bytes", "too large body" ]
      }
      if
      {
            ruby {
                code => "
                  new_event = LogStash::Event.new(Hash)
                  new_event.remove('@timestamp')
                  event.append(new_event)
                "
            }
      }
      grok {
    #       match => { "request" => '"%{WORD:verb} %{URIPATH:urlpath}(?:\?%{NGX_URIPARAM:urlparam})?(?: HTTP/%{NUMBER:httpversion})"' }
            match => { "request" => '"%{WORD:verb} %{URIPATH:urlpath}?(?: HTTP/%{NUMBER:httpversion})"' }
            patterns_dir => ["/etc/logstash/patterns"]
    #      remove_field => [ "message", "errinfo", "request" ]
      }
    }
    else {
            mutate { replace => { type => "random_logs" } }
    }
#################时区问题解决
   ruby {   
         code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"   
   }      
   ruby {
         code => "event.set('@timestamp',event.get('timestamp'))"
   }
   mutate {
         remove_field => ["timestamp"]
   }
###############
}
output {
    elasticsearch {
      hosts => "10.19.104.161:9200"
      #index => "logstash-nginx"
      index => "logstash-%{type}-%{+YYYY.MM.dd}"
      document_type => "%{type}"
      flush_size => 20000
      idle_flush_time => 10
      sniffing => true
      template_overwrite => true
    }
      stdout {
            codec => rubydebug
      }
}  使用上面的logstash配置文件,需要注意更改nginx日志格式,nginx日志格式应该改为如下:
       log_format elk"$http_x_forwarded_for | $time_local | $request | $status | $body_bytes_sent | "
                "$request_body | $content_length | $http_referer | $http_user_agent| "
                "$http_cookie | $remote_addr | $hostname | $upstream_addr | $upstream_response_time | $request_time|$gzip_ratio";

    access_log /data/log/nginx/access.log elk;
    error_log/data/log/nginx/error.log;  监控5分钟内xxx 状态码 出现 阀值n 次进行的微信报警
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author: gaogd

import weixin_alter asweixin
from elasticsearch import Elasticsearch
import json,time ,datetime,os

class Checkcode(object):
    def __init__(self,statuscode,num=30):
      self.host='172.31.1.79'
      self.port=9200
      self.statuscode=statuscode
      self.es = Elasticsearch([{'host': self.host, 'port': self.port}])
      #now_day = time.strftime('%Y.%m.%d', time.localtime(time.time()))
      now_day = (datetime.datetime.now() + datetime.timedelta(hours=-8)).strftime("%Y.%m.%d")
      ## 8 个小时差是为了解决logstash 导入日志到es中,索引回更加0时区来创建索引的,导致索引日期相差8个小时
      self.index = 'logstash-nginx_access-%s' % now_day
      self.file='./alter%s.log'%self.statuscode
      self.num=int(num)
    def Getdata(self):
      body = {
            "query": {
                "bool": {
                  "filter": [
                        {"term":
                           {"status": "%s"%self.statuscode
                              }
                         },
                        {
                            "range": {
                              "@timestamp": {
                                    "gt": "now-5m",
                                    "lt": "now"
                              },
                            },
                        },
                  ]
                }
            },
            "aggs": {
                "api_status_total": {"terms": {"field": "url_path.keyword"}}
            },
            "size": 0
      }
      res = self.es.search(index=self.index, body=body)
      num = res['hits']['total']
      api_list=res['aggregations']['api_status_total']['buckets']
      dict_api = {}
      for l in api_list:
            if type(l) is dict:
                dict_api]=l['doc_count']
      dictlist = sorted(dict_api.iteritems(), key=lambda d: d, reverse=True)
      content=''
      for n,v in enumerate(dictlist):
            if n > 9:
                break
            content = content + v.encode('utf-8') + " : " + str(v) + "\n"
      print 'code:', code, 'time', num
      print content
      return num,content
    def JudgeAlter(self,num,content):
      if not os.path.isfile(self.file):
            with open(self.file, 'w') as f:
                f.write('0')
      if int(num) > self.num:
            with open(self.file, 'r') as f:
                time = f.read()
            if len(time) == 0 or int(time) == 0 or int(time) >= 15:
                content = u"""
%s告警!!!
最近5分钟内前端日志出现%s错误\n 5 分钟内有: %s次
%s"""% (self.statuscode,self.statuscode,num,content)
                print content
                weixin.WeixinSend(str(content))
                with open(self.file, 'w+') as f:
                  f.write('1')
            elif 0 < int(time)0:
                content = u&quot;&quot;&quot;
%s 告警恢复!!!
最近5分钟内日志出现%s 错误少于%s次
现在错误次数为: %s次
%s&quot;&quot;&quot;% (self.statuscode,self.statuscode,self.num,num,content)
                print content
                weixin.WeixinSend(str(content))
                with open(self.file, 'w+') as f:
                  f.write('0')


if __name__ == &quot;__main__&quot;:
    print datetime.datetime.now()
    code='500'
    num=30
    try:
      import sys
      code=sys.argv
      num=sys.argv
    exceptException as err:
      print str(err)
      print &quot;Usage: python %s codenum&quot;%sys.argv
      exit()
    obj=Checkcode(code,num)
    num,dict_api=obj.Getdata()
    obj.JudgeAlter(num,dict_api)
   
### python /CheckStatusCode.py 500 20  logstash分析日志,导入到es中的时区问题解决

input { stdin {} }
output { stdout { codec => rubydebug } }
filter {
date {
    match => [&quot;message&quot;,&quot;UNIX_MS&quot;]
    target => &quot;@timestamp&quot;   
}
ruby {   
   code => &quot;event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)&quot;   
}
ruby {
   code => &quot;event.set('@timestamp',event.get('timestamp'))&quot;
}
mutate {
   remove_field => [&quot;timestamp&quot;]
}
}

上面这种方法,实际上是把时间多加8小时,时区并没有发生变化,这样做有一个坏处,就是你的kibana默认
是CST时区了,这个时候展示出来的数据就会在8个小时之后的地方了。有问题,不建议使用。
市区问题其实不用修改,kibana能够很好识别。  ## 参考 elk 框架
  flume| ===èkafka==èlogstash收集==èes 存储==è kibana展示



页: [1]
查看完整版本: 部署 elk 日志系统 elasticsearch、logstash、 kibana