设为首页 收藏本站
查看: 2551|回复: 1

[经验分享] ELK日志管理深度实战

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2017-1-5 15:34:17 | 显示全部楼层 |阅读模式
                      Logstash收集Rsyslog/syslog日志

syslog默认是通过514端口去发送日志,可以使用logstash安装对应的插件,监听514端口来收集日志。如果只是监听系统日志可以不用安装logstash的agent,只需要监听对应的514端口,收集系统数据即可。
logstash在INPUT插件中提供了syslog的模块,可以用于不安装本地日志收集agent的设备(如硬件防火墙等),当然支持此类协议的普通服务器也适用。
注意:此INPUT插件会同时监听TCP和UDP端口。

可以在安装logstash服务器node2(172.16.10.21)上编写logstash配置文件,指定端口(默认为514)同时指定写入的ES服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
[iyunv@node2 ~]# cat /etc/logstash/conf.d/syslog.conf
input {
     syslog{
     type => "system-syslog"
     port => 514
}
}
output {
    elasticsearch {
        hosts => ["172.16.10.21:9200"]         # 指定写入的ES服务器
        index => "system-syslog-%{+YYYY.MM}"
    }
}



在node1(172.16.10.20)使syslog自动发送日志信息到logstash,需要修改/etc/rsyslog.conf (CentOS7)的配置文件:
在最后添加一行,指定远程logstash服务器地址和端口:
1
*.* @@172.16.10.21:514



重启rsyslog服务:
1
systemctl restart rsyslog



在logstash服务器上指定配置文件启动:
1
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/syslog.conf



在node1使用logger 命令可以生成系统日志,使Elasticsearch上产生索引:
1
logger trying



登录Elasticsearch上就可查询到syslog的日志了,可以在kibana上添加此项。

TCP、UDP日志收集
在有时要对一些日志进行补充或者发送一些文件内容到es上时,可以通过使用TCP模块的方式。
编辑tcp.conf的logstash文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
[iyunv@node2 ~]# cat /etc/logstash/conf.d/tcp.conf
input {
    tcp {
        type => "tcp"
        port => "6666"
        mode => "server"
   }
}
output{
    stdout {
        codec => rubydebug
}
}



启动服务,在其他任何主机上可以通过nc 命令或者伪设备输出的方式,将需要发送的内容发送给node2:

1
2
3
[iyunv@node1 ~]# echo "trying"|nc 172.16.10.21 6666
[iyunv@node1 ~]# nc 172.16.10.21 6666 < /etc/resolv.conf
[iyunv@node1 ~]# echo "stuff" > /dev/tcp/172.16.10.21/6666



在node2上,显示接受的信息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
[iyunv@node2 ~]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/tcp.conf
Settings: Default pipeline workers: 1
Pipeline main started
{
       "message" => "trying",
      "@version" => "1",
    "@timestamp" => "2017-01-03T08:12:10.630Z",
          "host" => "172.16.10.20",
          "port" => 57054,
          "type" => "tcp"
}
{
       "message" => "# Generated by NetworkManager",
      "@version" => "1",
    "@timestamp" => "2017-01-03T08:13:11.081Z",
          "host" => "172.16.10.20",
          "port" => 57248,
          "type" => "tcp"
}
{
       "message" => "nameserver 114.114.114.114",
      "@version" => "1",
    "@timestamp" => "2017-01-03T08:13:11.081Z",
          "host" => "172.16.10.20",
          "port" => 57248,
          "type" => "tcp"
}
{
       "message" => "stuff",
      "@version" => "1",
    "@timestamp" => "2017-01-03T08:15:14.224Z",
          "host" => "172.16.10.20",
          "port" => 57634,
          "type" => "tcp"
}




使用Filter模块对Apache日志进行格式化

在logstash中可以对一些输入的格式进行格式化,如记录Apache日志时,由于自身不具有像nginx提供的JSON格式化的输出,所以在进行记录时,需要使用filter插件的grok来进行正则匹配。
在logstash中,已经默认提供了方便使用的正则表示方式,在/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns保存了相关的配置。
编写logstash的配置文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
[iyunv@node2 patterns]# cat /etc/logstash/conf.d/grok.conf
input{
    stdin {}
}
filter {
     grok {
    match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
  }
}
output {
    stdout{
        codec => rubydebug
}
}



官方文档提供了示例,可以直接使用https://www.elastic.co/guide/en/ ... s-filters-grok.html
启动服务,输入测试示例,返回格式化的输出:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[iyunv@node2 ~]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/grok.conf
Settings: Default pipeline workers: 1
Pipeline main started
55.3.244.1 GET /index.html 15824 0.043  #输入的示例
{
       "message" => "55.3.244.1 GET /index.html 15824 0.043",
      "@version" => "1",
    "@timestamp" => "2017-01-03T08:59:44.264Z",
          "host" => "node2",
        "client" => "55.3.244.1",
        "method" => "GET",
       "request" => "/index.html",
         "bytes" => "15824",
      "duration" => "0.043"
}



使用grok收集Apache日志到Elasticsearch.

在默认的Apache日志格式下,logstash提供了一个已经编写好的日志格式正则filter:

1
2
3
4
[iyunv@node2 ~]# cat /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns/grok-patterns \
|grep "COMBINEDAPACHELOG"

COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}




编写logstash配置文件,引用此参数COMBINEDAPACHELOG:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[iyunv@node2 ~]# cat /etc/logstash/conf.d/grok.conf                              
input{
    file {
        path => "/var/log/httpd/access_log"
        start_position => "beginning"
}
}
filter {
     grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }  #调用此参数
  }
}
output {
    elasticsearch {
        hosts => ["172.16.10.20:9200"]
        index => "apache-access-log-%{+YYYY.MM.dd}"
}
#   stdout{ codec => rubydebug  }
}



这样,默认的日志文件就可以被格式化输出到Elasticsearch上了。

使用grok会有一些问题,比如会非常影响性能,而且很不灵活,除非对ruby掌控很好。
在实际的生产环境中,为了实现松耦合和性能问题,一般将日志写入redis,再使用python对其进行格式化处理。

使用消息队列扩展ELK

数据 >> lostash >> MQ/Redis >> logstash >> ES

在实现低耦合和高可靠性的ELK部署架构中,使用消息队列的方式来作为一个中间转存的数据交换中心,将数据文件需要写入到消息队列,而对消息队列中数据的处理可有其它任意的应用去实现,整个过程写入和输出系统没有任何依赖关系。
这里使用最简单的redis来实现这一功能。
在node2上安装redis,将收集的数据发送到redis中,不做任何处理,这里需要先修改redis的配置文件:
1
2
3
# vim /etc/redis.conf
daemonize yes          # 使用台运行模式
bind 172.16.10.21      # 绑定监听的IP



通过使用logstash将数据写入redis,创建logstash的配置文件,用于将数据写入redis:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
[iyunv@node2 ~]# cat /etc/logstash/conf.d/redis.conf
input{
    stdin {
}
}
output {
        redis {
          host => "172.16.10.21"
          port => "6379"
          db => "6"
          data_type => "list"
          key => "demo"
}
}



启动redis,指定配置文件启动logstash:
1
systemctl start redis



1
2
3
4
[iyunv@node2 ~]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/redis.conf
Settings: Default pipeline workers: 1
Pipeline main started
try



在redis上可以查看到输入的数据:
1
2
3
4
5
6
7
8
9
10
11
[iyunv@node2 ~]# redis-cli  -h 172.16.10.21 -p 6379
172.16.10.21:6379> info
# Keyspace
db6:keys=2,expires=0,avg_ttl=0
172.16.10.21:6379> select 6
OK
172.16.10.21:6379[6]> keys *
1) "apache-accesslog"
2) "demo"
172.16.10.21:6379[6]> lindex demo -1
"{\"message\":\"try\",\"@version\":\"1\",\"@timestamp\":\"2017-01-04T03:29:28.337Z\",\"host\":\"node2\"}"



使用另一个logstash将数据从redis中取出:
在node1上配置logstash的配置文件,指定文件启动,此时会自动将redis上的数据读出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[iyunv@node1 ~]# cat /etc/logstash/conf.d/redis-get.conf
input {
       redis {
          host => "172.16.10.21"
          port => "6379"
          db => "6"
          data_type => "list"
          key => "demo"
}
}
filter {}
output {
    stdout{
        codec => rubydebug
}
}



1
2
3
4
5
6
7
8
9
[iyunv@node1 conf.d]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/redis-get.conf
Settings: Default pipeline workers: 1
Pipeline main started
{
       "message" => "try",
      "@version" => "1",
    "@timestamp" => "2017-01-03T12:10:20.635Z",
          "host" => "node2"
}



上面使用的是手动输入的方式,在实际收集日志时,可以在输入时指定文件,输出时指定到Elasticsearch。

使用Redis收集Apache访问日志
在上面的示例中,我们使用了直接通过grok来收集Apache的日志,但是在实际的生产环境,由于这种方案会使性能下降,而且在整个架构上过度依赖于logstash,无法做到解耦,并不是一个好的解决方案。这里引入消息队列,作为一个解耦,上日志的存取分开,方便后期扩展,提高了稳定性。
在node2上配置logstash 配置文件,指定Apache的access文件,并启动:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[iyunv@node2 ~]# cat /etc/logstash/conf.d/apache-redis.conf
input{
    file {
        path => "/var/log/httpd/access_log"
        start_position => "beginning"
}
}
output {
        redis {
          host => "172.16.10.21"
          port => "6379"
          db => "6"
          data_type => "list"
          key => "apache-log"
}
}



此处直接完整输入内容到redis,不做任何处理。
在node1上从redis中取出日志,并通过grok进行日志格式化处理,之后输出到Elasticsearch中。
node1上logstash的配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[iyunv@node1 ~]# cat /etc/logstash/conf.d/redis-get.conf
input {
       redis {
          host => "172.16.10.21"
          port => "6379"
          db => "6"
          data_type => "list"
          key => "apache-log"
}
}
filter {
    grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }
}
output {
    elasticsearch{
        hosts => ["172.16.10.20:9200"]
        index => "apache-log-%{+YYYY.MM.dd}"
}
}



这样,启动node1和node2上的logstash,在指定的Elasticsearch上就可以看到被格式化后的日志输出了。
和redis类似,可以使用Kafka作为存储数据的中间节点:https://www.unixhot.com/article/61


ELK项目生产规划
在实际的生产环境中要对日志进行收集,首先需要对日志分类:
访问日志: apache,nginx, tomcat 等web访问日志   通过file插件收集,apache通过filter插件处理格式
错误日志:error log,java 日志 等可以直接收取,对于一条多行的日志(java异常)使用多行处理插件
系统日志: /var/log  syslog 等 直接使用 syslog插件收取
网络日志: 防火墙,交换机,路由器等   使用 syslog日志
运行日志:应用程序定义    使用file 插件, 最好为json格式

标准化: 统一存放路径,命名规则,格式(json),日志切割(crontab). 需要保存的日志先在本地读写,提高性能,通过rsync推送到指定存储,防止挂载故障而使服务终止。删除本地日志(确认保留时间)

工具化:可以使用logstash进行日志收集方案。
如果使用redis作为消息队列,那么要对所有的list key的长度进行监控,防止溢出和数据丢失
llen key_name   
根据实际情况,例如超过 10万 就报警。

如将之前的所有配置合并到一处,在使用redis作为存储时,可以这样编辑logstash配置文件:
从redis服务器接受日志,处理后存入Elasticsearch:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
input {
   syslog {
     type => "system-syslog"
     port => 514
   }
   redis {
        type => "apache-accesslog"
        host => "192.168.56.12"
        port => "6379"
        db => "6"
        data_type => "list"
        key => "apache-accesslog"
    }
    redis {
        type => "es-log"
        host => "192.168.56.12"
        port => "6379"
        db => "6"
        data_type => "list"
        key => "es-log"
    }
}
filter {
    if [type] == "apache-accesslog" {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    }
}
output {
    if [type] == "apache-accesslog" {
    elasticsearch {
       hosts => ["192.168.56.11:9200"]
        index => "apache-accesslog-%{+YYYY.MM.dd}"
    }
    }
    if [type] == "es-log" {
        elasticsearch {
                hosts => ["192.168.56.11:9200"]
                index => "es-log-%{+YYYY.MM}"
        }
    }
    if [type] == "system-syslog" {
        elasticsearch {
                hosts => ["192.168.56.11:9200"]
                index => "system-syslog-%{+YYYY.MM}"
        }
    }
}




从服务器接收日志,将日志存入redis/MQ:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
input {
    file {
        path => "/var/log/httpd/access_log"
        start_position => "beginning"
        type => "apache-accesslog"
    }
    file {
        path => "/var/log/elasticsearch/elasticsearch.log"
        type => "es-log"
        start_position => "beginning"
        codec => multiline{
          pattern => "^\["
          negate => true
          what => "previous"
        }
    }
}
output {
    if [type] == "apache-accesslog" {
        redis {
                host => "192.168.56.12"
                port => "6379"
                db => "6"
                data_type => "list"
                key => "apache-accesslog"
        }
    }
    if [type] == "es-log" {
    redis {
        host => "192.168.56.12"
        port => "6379"
        db => "6"
        data_type => "list"
        key => "es-log"
    }
    }
}



这里使用了if做为了条件的判断,来对不同的日志进行选择处理,在一些系统日志如syslog等,可以单独进行处理。

提示: 在使用系统自带的脚本进行启动时,会出现无法收集日志的情况,脚本中默认的是使用logstash用户,在收集一些日志时可能存在没有权限的情况。可以修改脚本中的默认用户,或者给logstash添加其他用户组的权限。
                   


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-324316-1-1.html 上篇帖子: ELK5---Elasticsearch Cluster的搭建 下篇帖子: ELK(elasticsearch5.0)head插件安装配置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表