50010623 发表于 2019-1-28 12:54:27

在Centos7上部署ELK实时日志分析平台

什么是日志?
  日志就是程序产生的,遵循一定格式(通常包含时间戳)的文本数据
  通常日志由服务器生成,输出到不同的文件中,一般会有系统日志、 应用日志、安全日志。这些日志分散地存储在不同的机器上。
  通常当系统发生故障时,工程师需要登录到各个服务器上,使用 grep / sed / awk 等 Linux 脚本工具去日志里查找故障原因。在没有日志系统的情况下,首先需要定位处理请求的服务器,如果这台服务器部署了多个实例,则需要去每个应用实例的日志目录下去找日志文件。每个应用实例还会设置日志滚动策略(如:每天生成一个文件),还有日志压缩归档策略等。
  这样一系列流程下来,对于我们排查故障以及及时找到故障原因,造成了比较大的麻烦。因此,如果我们能把这些日志集中管理,并提供集中检索功能,不仅可以提高诊断的效率,同时对系统情况有个全面的理解,避免事后救火的被动。
  我认为,日志数据在以下几方面具有非常重要的作用:
  数据查找:通过检索日志信息,定位相应的 bug ,找出解决方案
  服务诊断:通过对日志信息进行统计、分析,了解服务器的负荷和服务运行状态
  数据分析:可以做进一步的数据分析,比如根据请求中的课程 id ,找出 TOP10 用户感兴趣课程。
  针对这些问题,为了提供分布式的实时日志搜集和分析的监控系统,我们采用了业界通用的日志数据管理解决方案 - 它主要包括 Elasticsearch 、 Logstash 和 Kibana 三个系统。通常,业界把这套方案简称为ELK,取三个系统的首字母,但是我们实践之后将其进一步优化为EFK,F代表Filebeat,用以解决Logstash导致的问题。

一、ELK是什么鬼?
  ELK实际上是三个工具的集合,Elasticsearch + Logstash + Kibana,这三个工具组合形成了一套实用、易用的监控架构,很多公司利用它来搭建可视化的海量日志分析平台。


[*]ElasticSearch
  ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。


[*]Logstash
  Logstash是一个用于管理日志和事件的工具,你可以用它去收集日志、转换日志、解析日志并将他们作为数据提供给其它模块调用,例如搜索、存储等。


[*]Kibana
  Kibana是一个优秀的前端日志展示框架,它可以非常详细的将日志转化为各种图表,为用户提供强大的数据可视化支持。

二、ELK有何优势?


[*]  强大的搜索功能,elasticsearch可以以分布式搜索的方式快速检索,而且支持DSL的语法来进行搜索,简单的说,就是通过类似配置的语言,快速筛选数据。

[*]  完美的展示功能,可以展示非常详细的图表信息,而且可以定制展示内容,将数据可视化发挥的淋漓尽致。

[*]分布式功能,能够解决大型集群运维工作很多问题,包括监控、预警、日志收集解析等。

三、ELK一般用来做啥?
  ELK组件在海量日志系统的运维中,可用于解决:


[*]分布式日志数据集中式查询和管理
[*]系统监控,包含系统硬件和应用各个组件的监控
[*]故障排查
[*]安全信息和事件管理
[*]报表功能
  ELK组件在大数据运维系统中,主要可解决的问题如下:


[*]日志查询,问题排查,上线检查
[*]服务器监控,应用监控,错误报警,Bug管理
[*]性能分析,用户行为分析,安全漏洞分析,时间管理
http://i2.运维网.com/images/blog/201808/21/5fd44c0daf5289e91d17b8daff241af6.png
  如上图:Logstash收集AppServer产生的Log,并存放到ElasticSearch集群中,而Kibana则从ES集群中查询数据生成图表,再返回给Browser。

ELK环境部署:
  (0)基础环境介绍
  系统: Centos7
  防火墙: 关闭
  Sellinux: 关闭
  机器环境: 两台

elk-node1: 192.168.1.160       #master机器
elk-node2:192.168.1.161      #slave机器
  注明:
  master-slave模式:
  master收集到日志后,会把一部分数据碎片到salve上(随机的一部分数据);同时,master和slave又都会各自做副本,并把副本放到对方机器上,这样就保证了数据不会丢失。
如果master宕机了,那么客户端在日志采集配置中将elasticsearch主机指向改为slave,就可以保证ELK日志的正常采集和web展示。
  由于elk-node1和elk-node2两台是虚拟机,没有外网ip,所以访问需要通过宿主机进行代理转发实现。
  有以下两种转发设置:(任选其一)
  通过访问宿主机的19200,19201端口分别转发到elk-node1,elk-node2的9200端口
通过访问宿主机的15601端口转发到elk-node1的5601端口
  宿主机:112.110.115.10(内网ip为192.168.1.7)(为了不让线上的真实ip暴露,这里任意给了一个ip做记录)
  a)通过宿主机的haproxy服务进行代理转发,如下是宿主机上的代理配置:

# pwd
/usr/local/haproxy/conf
# cat haproxy.cfg
..........
..........
listen node1-9200 0.0.0.0:19200
mode tcp
option tcplog
balance roundrobin
server 192.168.1.160 192.168.1.160:9200 weight 1 check inter 1s rise 2 fall 2
listen node2-9200 0.0.0.0:19201
mode tcp
option tcplog
balance roundrobin
server 192.168.1.161 192.168.1.161:9200 weight 1 check inter 1s rise 2 fall 2
listen node1-5601 0.0.0.0:15601
mode tcp
option tcplog
balance roundrobin
server 192.168.1.160 192.168.1.160:5601 weight 1 check inter 1s rise 2 fall 2
重启haproxy服务
# /etc/init.d/haproxy restart
设置宿主机防火墙
# cat /etc/sysconfig/iptables
.........
-A INPUT -p tcp -m state --state NEW -m tcp --dport 19200 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 19201 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 15601 -j ACCEPT
# /etc/init.d/iptables restart
  b)通过宿主机的NAT端口转发实现

# iptables -t nat -A PREROUTING -p tcp -m tcp --dport 19200 -j DNAT --to-destination 192.168.1.160:9200
# iptables -t nat -A POSTROUTING -d 192.168.1.160/32 -p tcp -m tcp --sport 9200 -j SNAT --to-source 192.168.1.7
# iptables -t filter -A INPUT -p tcp -m state --state NEW -m tcp --dport 19200 -j ACCEPT
# iptables -t nat -A PREROUTING -p tcp -m tcp --dport 19201 -j DNAT --to-destination 192.168.1.161:9200
# iptables -t nat -A POSTROUTING -d 192.168.1.161/32 -p tcp -m tcp --sport 9200 -j SNAT --to-source 192.168.1.7
# iptables -t filter -A INPUT -p tcp -m state --state NEW -m tcp --dport 19201 -j ACCEPT
# iptables -t nat -A PREROUTING -p tcp -m tcp --dport 15601 -j DNAT --to-destination 192.168.1.160:5601
# iptables -t nat -A POSTROUTING -d 192.168.1.160/32 -p tcp -m tcp --sport 5601 -j SNAT --to-source 192.168.1.7
# iptables -t filter -A INPUT -p tcp -m state --state NEW -m tcp --dport 15601 -j ACCEPT
# service iptables save
# service iptables restart
  提醒一点:
  nat端口转发设置成功后,/etc/sysconfig/iptables文件里要注释掉下面两行!不然nat转发会有问题!一般如上面在nat转发规则设置好并save和restart防火墙之后就会自动在/etc/sysconfig/iptables文件里删除掉下面两行内容了。

# vim /etc/sysconfig/iptables
..........
#-A INPUT -j REJECT --reject-with icmp-host-prohibited
#-A FORWARD -j REJECT --reject-with icmp-host-prohibited
# service iptables restart
  (1). Elasticsearch安装配置
  基础环境安装(elk-node1和elk-node2同时操作)
  1)下载并安装GPG Key

# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
  2)添加yum仓库

# vim /etc/yum.repos.d/elasticsearch.repo

name=Elasticsearch repository for 2.x packages
baseurl=http://packages.elastic.co/elasticsearch/2.x/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
  3)安装elasticsearch

# yum install -y elasticsearch
  4)安装相关测试软件

#提前先下载安装epel源:epel-release-latest-7.noarch.rpm,否则yum会报错:No Package.....
# wget http://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
# rpm -ivh epel-release-latest-7.noarch.rpm
#安装Redis
# yum install -y redis
#安装Nginx
# yum install -y nginx
#安装java
# yum install -y java
  安装完java后,检测

# java -version
openjdk version "1.8.0_102"
OpenJDK Runtime Environment (build 1.8.0_102-b14)
OpenJDK 64-Bit Server VM (build 25.102-b14, mixed mode)
  配置部署(下面先进行elk-node1的配置)
  1)配置修改配置文件

# mkdir -p /data/es-data
# vim /etc/elasticsearch/elasticsearch.yml                               【将里面内容情况,配置下面内容】
cluster.name: huanqiu                            # 组名(同一个组,组名必须一致)
node.name: elk-node1                            # 节点名称,建议和主机名一致
path.data: /data/es-data                         # 数据存放的路径
path.logs: /var/log/elasticsearch/             # 日志存放的路径
bootstrap.mlockall: true                         # 锁住内存,不被使用到交换分区去
network.host: 0.0.0.0                            # 网络设置
http.port: 9200                                    # 端口
  2)启动并查看

# chown -R elasticsearch.elasticsearch /data/
# systemctl start elasticsearch
# systemctl status elasticsearch
CGroup: /system.slice/elasticsearch.service
└─3005 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSI...
  注意:上面可以看出elasticsearch设置的内存最小256m,最大1g

# netstat -antlp |egrep "9200|9300"
tcp6 0 0 :::9200 :::* LISTEN 3005/java
tcp6 0 0 :::9300 :::* LISTEN 3005/java
  然后通过web访问(访问的浏览器最好用google浏览器)
http://i2.运维网.com/images/blog/201808/21/0229d96d59fe1b86d33ed1f024319805.png

http://112.110.115.10:19200/
  3)通过命令的方式查看数据(在112.110.115.10宿主机或其他外网服务器上查看,如下)

# curl -i -XGET 'http://192.168.1.160:9200/_count?pretty' -d '{"query":{"match_all":{}}}'
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8
Content-Length: 95
{
"count" : 0,
"_shards" : {
"total" : 0,
"successful" : 0,
"failed" : 0
}
}
  这样感觉用命令来查看,特别的不爽。
  4)接下来安装插件,使用插件进行查看(下面两个插件要在elk-node1和elk-node2上都要安装)
  4.1)安装head插件
  a)插件安装方法一

# /usr/share/elasticsearch/bin/plugin install mobz/elasticsearch-head
  b)插件安装方法二
  首先下载head插件,下载到/usr/loca/src目录下

下载地址:https://github.com/mobz/elasticsearch-head
# unzip elasticsearch-head-master.zip
# ls
elasticsearch-head-master elasticsearch-head-master.zip
  在/usr/share/elasticsearch/plugins目录下创建head目录
  然后将上面下载的elasticsearch-head-master.zip解压后的文件都移到/usr/share/elasticsearch/plugins/head下,接着重启elasticsearch服务即可!

# cd /usr/share/elasticsearch/plugins/
# mkdir head
# ls
head
# cd head
# cp -r /usr/local/src/elasticsearch-head-master/* ./
# pwd
/usr/share/elasticsearch/plugins/head
# chown -R elasticsearch:elasticsearch /usr/share/elasticsearch/plugins
# ll
total 40
-rw-r--r--. 1 elasticsearch elasticsearch 104 Sep 28 01:57 elasticsearch-head.sublime-project
-rw-r--r--. 1 elasticsearch elasticsearch 2171 Sep 28 01:57 Gruntfile.js
-rw-r--r--. 1 elasticsearch elasticsearch 3482 Sep 28 01:57 grunt_fileSets.js
-rw-r--r--. 1 elasticsearch elasticsearch 1085 Sep 28 01:57 index.html
-rw-r--r--. 1 elasticsearch elasticsearch 559 Sep 28 01:57 LICENCE
-rw-r--r--. 1 elasticsearch elasticsearch 795 Sep 28 01:57 package.json
-rw-r--r--. 1 elasticsearch elasticsearch 100 Sep 28 01:57 plugin-descriptor.properties
-rw-r--r--. 1 elasticsearch elasticsearch 5211 Sep 28 01:57 README.textile
drwxr-xr-x. 5 elasticsearch elasticsearch 4096 Sep 28 01:57 _site
drwxr-xr-x. 4 elasticsearch elasticsearch 29 Sep 28 01:57 src
drwxr-xr-x. 4 elasticsearch elasticsearch 66 Sep 28 01:57 test
# systemctl restart elasticsearch
  插件访问(最好提前将elk-node2节点的配置和插件都安装后,再来进行访问和数据插入测试)

http://112.110.115.10:19200/_plugin/head/
http://i2.运维网.com/images/blog/201808/21/29b2da51f98db3b9334f365a1962b8d4.png
  先插入数据实例,测试下
  如下:打开”复合查询“,在POST选项下,任意输入如/index-demo/test,然后在下面输入数据(注意内容之间换行的逗号不要漏掉);
  数据输入好之后(如下输入wangshibo;hello world内容),下面点击”验证JSON“->”提交请求“,提交成功后,观察右栏里出现的信息:有index,type,version等信息,failed:0(成功消息)
  再查看测试实例,如下:
  "复合查询"下,选择GET选项,在/index-demo/test/后面输入上面POST结果中的id号,不输入内容,即{}括号里为空!
  然后点击”验证JSON“->"提交请求",观察右栏内就有了上面插入的数据了(即wangshibo,hello world)
  打开"基本查询",查看下数据,如下,即可查询到上面插入的数据:
http://i2.运维网.com/images/blog/201808/21/262f81e6e97ca57f8c59517ca9a1713d.png
  打开“数据浏览”,也能查看到插入的数据:
http://i2.运维网.com/images/blog/201808/21/efdb00e2a2bc215a96df3b05817acd0a.png
  如下:一定要提前在elk-node2节点上也完成配置(配置内容在下面提到),否则上面插入数据后,集群状态会呈现×××yellow状态,elk-node2完成配置加入到集群里后就会恢复到正常的绿色状态。
http://i2.运维网.com/images/blog/201808/21/6a569f9945c623a781180546fd495a2f.png
  4.2)安装kopf监控插件
  a)监控插件安装方法一

# /usr/share/elasticsearch/bin/plugin install lmenezes/elasticsearch-kopf
  b)监控插件安装方法二
  首先下载监控插件kopf,下载到/usr/loca/src目录下

下载地址:https://github.com/lmenezes/elasticsearch-kopf
# unzip elasticsearch-kopf-master.zip
# ls
elasticsearch-kopf-master elasticsearch-kopf-master.zip
  在/usr/share/elasticsearch/plugins目录下创建kopf目录
  然后将上面下载的elasticsearch-kopf-master.zip解压后的文件都移到/usr/share/elasticsearch/plugins/kopf下,接着重启elasticsearch服务即可!

# cd /usr/share/elasticsearch/plugins/
# mkdir kopf
# cd kopf
# cp -r /usr/local/src/elasticsearch-kopf-master/* ./
# pwd
/usr/share/elasticsearch/plugins/kopf
# chown -R elasticsearch:elasticsearch /usr/share/elasticsearch/plugins
# ll
total 40
-rw-r--r--. 1 elasticsearch elasticsearch 237 Sep 28 16:28 CHANGELOG.md
drwxr-xr-x. 2 elasticsearch elasticsearch 22 Sep 28 16:28 dataset
drwxr-xr-x. 2 elasticsearch elasticsearch 73 Sep 28 16:28 docker
-rw-r--r--. 1 elasticsearch elasticsearch 4315 Sep 28 16:28 Gruntfile.js
drwxr-xr-x. 2 elasticsearch elasticsearch 4096 Sep 28 16:28 imgs
-rw-r--r--. 1 elasticsearch elasticsearch 1083 Sep 28 16:28 LICENSE
-rw-r--r--. 1 elasticsearch elasticsearch 1276 Sep 28 16:28 package.json
-rw-r--r--. 1 elasticsearch elasticsearch 102 Sep 28 16:28 plugin-descriptor.properties
-rw-r--r--. 1 elasticsearch elasticsearch 3165 Sep 28 16:28 README.md
drwxr-xr-x. 6 elasticsearch elasticsearch 4096 Sep 28 16:28 _site
drwxr-xr-x. 4 elasticsearch elasticsearch 27 Sep 28 16:28 src
drwxr-xr-x. 4 elasticsearch elasticsearch 4096 Sep 28 16:28 tests
# systemctl restart elasticsearch
  访问插件:(如下,同样要提前安装好elk-node2节点上的插件,否则访问时会出现集群节点为×××的yellow告警状态)

http://112.110.115.10:19200/_plugin/kopf/#!/cluster
http://i2.运维网.com/images/blog/201808/21/ef41f60c6852acf66b83a7f013832db0.png
  下面进行节点elk-node2的配置 (如上的两个插件也在elk-node2上同样安装)
  注释:其实两个的安装配置基本上是一样的。

# mkdir -p /data/es-data
# cat /etc/elasticsearch/elasticsearch.yml
cluster.name: huanqiu
node.name: elk-node2
path.data: /data/es-data
path.logs: /var/log/elasticsearch/
bootstrap.mlockall: true
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.multicast.enabled: false
discovery.zen.ping.unicast.hosts: ["192.168.1.160", "192.168.1.161"]
# 修改权限配置
# chown -R elasticsearch.elasticsearch /data/
# 启动服务
# systemctl start elasticsearch
# systemctl status elasticsearch
● elasticsearch.service - Elasticsearch
Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; enabled; vendor preset: disabled)
Active: active (running) since Wed 2016-09-28 16:49:41 CST; 1 weeks 3 days ago
Docs: http://www.elastic.co
Process: 17798 ExecStartPre=/usr/share/elasticsearch/bin/elasticsearch-systemd-pre-exec (code=exited, status=0/SUCCESS)
Main PID: 17800 (java)
CGroup: /system.slice/elasticsearch.service
└─17800 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFra...
Oct 09 13:42:22 elk-node2 elasticsearch: Transport res...943817]
Oct 09 13:42:23 elk-node2 elasticsearch: Transport res...943846]
......
# 查看端口
# netstat -antlp|egrep "9200|9300"
tcp6 0 0 :::9200 :::* LISTEN 2928/java
tcp6 0 0 :::9300 :::* LISTEN 2928/java
tcp6 0 0 127.0.0.1:48200 127.0.0.1:9300 TIME_WAIT -
tcp6 0 0 ::1:41892 ::1:9300 TIME_WAIT -

  通过命令的方式查看elk-node2数据(在112.110.115.10宿主机或其他外网服务器上查看,如下)

# curl -i -XGET 'http://192.168.1.161:9200/_count?pretty' -d '{"query":{"match_all":{}}}'
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8
Content-Length: 95
{
"count" : 1,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
}
  然后通过web访问elk-node2

http://112.110.115.10:19201/
  访问两个插件:

http://112.110.115.10:19201/_plugin/head/
http://112.110.115.10:19201/_plugin/kopf/#!/cluster
  这里几图和elk-node1基本相同,这里不再操作。
  (2)Logstash安装配置(这个在客户机上是要安装的。elk-node1和elk-node2都安装)
  基础环境安装(客户端安装logstash,收集到的数据写入到elasticsearch里,就可以登陆logstash界面查看到了)
  1)下载并安装GPG Key

# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
  2)添加yum仓库

# vim /etc/yum.repos.d/logstash.repo

name=Logstash repository for 2.1.x packages
baseurl=http://packages.elastic.co/logstash/2.1/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
  3)安装logstash

# yum install -y logstash
  4)logstash启动

# systemctl start elasticsearch
# systemctl status elasticsearch
● elasticsearch.service - Elasticsearch
Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; disabled; vendor preset: disabled)
Active: active (running) since Mon 2016-11-07 18:33:28 CST; 3 days ago
Docs: http://www.elastic.co
Main PID: 8275 (java)
CGroup: /system.slice/elasticsearch.service
└─8275 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFrac...

  数据的测试
  1)基本的输入输出

# /opt/logstash/bin/logstash -e 'input { stdin{} } output { stdout{} }'
Settings: Default filter workers: 1
Logstash startup completed
hello                                                                                     #输入这个
2016-11-11T06:41:07.690Z elk-node1 hello                        #输出这个
wangshibo                                                                            #输入这个
2016-11-11T06:41:10.608Z elk-node1 wangshibo               #输出这个
  2)使用rubydebug详细输出

# /opt/logstash/bin/logstash -e 'input { stdin{} } output { stdout{ codec => rubydebug} }'
Settings: Default filter workers: 1
Logstash startup completed
hello                                                                                    #输入这个
{                                                                                       #输出下面信息
"message" => "hello",
"@version" => "1",
"@timestamp" => "2016-11-11T06:44:06.711Z",
"host" => "elk-node1"
}
wangshibo                                                                         #输入这个
{                                                                                       #输出下面信息
"message" => "wangshibo",
"@version" => "1",
"@timestamp" => "2016-11-11T06:44:11.270Z",
"host" => "elk-node1"
}
  3) 把内容写到elasticsearch中

# /opt/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.1.160:9200"]} }'
Settings: Default filter workers: 1
Logstash startup completed                     #输入下面的测试数据
123456
wangshibo
huanqiu
hahaha
  使用rubydebug和写到elasticsearch中的区别:
  其实就在于后面标准输出的区别,前者使用codec;后者使用elasticsearch
  写到elasticsearch中在logstash中查看,如下图:
  注意:
  master收集到日志后,会把一部分数据碎片到salve上(随机的一部分数据),master和slave又都会各自做副本,并把副本放到对方机器上,这样就保证了数据不会丢失。如下,master收集到的数据放到了自己的第1,3分片上,其他的放到了slave的第0,2,4分片上。
http://i2.运维网.com/images/blog/201808/21/f341d541882aa02fd3c5b0b982f9f563.png
http://i2.运维网.com/images/blog/201808/21/dc50eb97c5496530de4f4977f9736932.png
  4)即写到elasticsearch中又写在文件中一份

# /opt/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.1.160:9200"]} stdout{ codec => rubydebug}}'
Settings: Default filter workers: 1
Logstash startup completed
huanqiupc
{
"message" => "huanqiupc",
"@version" => "1",
"@timestamp" => "2016-11-11T07:27:42.012Z",
"host" => "elk-node1"
}
wangshiboqun
{
"message" => "wangshiboqun",
"@version" => "1",
"@timestamp" => "2016-11-11T07:27:55.396Z",
"host" => "elk-node1"
}
  以上文本可以长期保留、操作简单、压缩比大。下面登陆elasticsearch界面中查看;
http://i2.运维网.com/images/blog/201808/21/9fb5760f6b9b56cc5721dfc75a372ebf.png
  logstash的配置和文件的编写
  1)logstash的配置
  简单的配置方式:

# vim /etc/logstash/conf.d/01-logstash.conf
input { stdin { } }
output {
elasticsearch { hosts => ["192.168.1.160:9200"]}
stdout { codec => rubydebug }
}
  它的执行:

# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/01-logstash.conf
Settings: Default filter workers: 1
Logstash startup completed
beijing                                                #输入内容
{                                                       #输出下面信息
"message" => "beijing",
"@version" => "1",
"@timestamp" => "2016-11-11T07:41:48.401Z",
"host" => "elk-node1"
}
http://i2.运维网.com/images/blog/201808/21/6a14aaffecdcc0491fbcc888cd1bafe8.png
  2)收集系统日志

# vimfile.conf
input {
file {
path => "/var/log/messages"
type => "system"
start_position => "beginning"
}
}
output {
elasticsearch {
hosts => ["192.168.1.160:9200"]
index => "system-%{+YYYY.MM.dd}"
}
}
  执行上面日志信息的收集,如下,这个命令会一直在执行中,表示日志在监控收集中;如果中断,就表示日志不在收集!所以需要放在后台执行~

# /opt/logstash/bin/logstash -f file.conf &
  登陆elasticsearch界面,查看本机系统日志的信息:
http://i2.运维网.com/images/blog/201808/21/e962e3cd0ab407c1033399da96db2931.png
http://i2.运维网.com/images/blog/201808/21/f0d5fd5c08d1d44c0528e8d75fa771b3.png
  3)收集java日志,其中包含上面讲到的日志收集

# vimfile.conf
input {
file {
path => "/var/log/messages"
type => "system"
start_position => "beginning"
}
}
input {
file {
path => "/var/log/elasticsearch/huanqiu.log"
type => "es-error"
start_position => "beginning"
}
}
output {
if == "system"{
elasticsearch {
hosts => ["192.168.1.160:9200"]
index => "system-%{+YYYY.MM.dd}"
}
}
if == "es-error"{
elasticsearch {
hosts => ["192.168.1.160:9200"]
index => "es-error-%{+YYYY.MM.dd}"
}
}
}
  注意:
  如果你的日志中有type字段,那你就不能在conf文件中使用type
  执行如下命令收集:

# /opt/logstash/bin/logstash -f file.conf &
  登陆elasticsearch界面,查看数据:
http://i2.运维网.com/images/blog/201808/21/ae43068ac4523d33e9729d8ab2359481.png
  有个问题:
  每个报错都给收集成一行了,不是按照一个报错,一个事件模块收集的。
  下面将行换成事件的方式展示:

# vim multiline.conf
input {
stdin {
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
}
output {
stdout {
codec => "rubydebug"
}
}
执行命令:
# /opt/logstash/bin/logstash -f multiline.conf
Settings: Default filter workers: 1
Logstash startup completed
123
456
[123
{
"@timestamp" => "2016-11-11T09:28:56.824Z",
"message" => "123\n456",
"@version" => "1",
"tags" => [
"multiline"
],
"host" => "elk-node1"
}
123]

{
"@timestamp" => "2016-11-11T09:29:09.043Z",
"message" => "",
"@version" => "1",
"tags" => [
"multiline"
],
"host" => "elk-node1"
}
  在没有遇到[的时候,系统不会收集,只有遇见[的时候,才算是一个事件,才收集起来。
  (3)Kibana安装配置
  1)kibana的安装:

# cd /usr/local/src
# wget https://download.elastic.co/kibana/kibana/kibana-4.3.1-linux-x64.tar.gz
# tar zxf kibana-4.3.1-linux-x64.tar.gz
# mv kibana-4.3.1-linux-x64 /usr/local/
# ln -s /usr/local/kibana-4.3.1-linux-x64/ /usr/local/kibana
  2)修改配置文件:

# pwd
/usr/local/kibana/config
# cp kibana.yml kibana.yml.bak
# vim kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.url: "http://192.168.1.160:9200"
kibana.index: ".kibana"
  因为他一直运行在前台,要么选择开一个窗口,要么选择使用screen。
  安装并使用screen启动kibana:

# yum -y install screen
# screen                        #这样就另开启了一个终端窗口
# /usr/local/kibana/bin/kibana
log Status changed from uninitialized to green - Ready
log Status changed from uninitialized to yellow - Waiting for Elasticsearch
log Status changed from uninitialized to green - Ready
log Status changed from uninitialized to green - Ready
log Status changed from uninitialized to green - Ready
log Status changed from uninitialized to green - Ready
log Status changed from uninitialized to green - Ready
log Status changed from uninitialized to green - Ready
  然后按ctrl+a+d组合键,这样在上面另启的screen屏里启动的kibana服务就一直运行在前台了

# screen -ls
There is a screen on:
15041.pts-0.elk-node1 (Detached)
1 Socket in /var/run/screen/S-root.
  (3)访问kibana:http://112.110.115.10:15601/
  如下,如果是添加上面设置的java日志收集信息,则在下面填写es-error;如果是添加上面设置的系统日志信息system,以此类型(可以从logstash界面看到日志收集项)
  然后点击上面的Discover,在Discover中查看:
  查看日志登陆,需要点击“Discover”-->"message",点击它后面的“add”
  注意:
  需要右边查看日志内容时带什么属性,就在左边点击相应属性后面的“add”
如下图,添加了message和path的属性:
http://i2.运维网.com/images/blog/201808/21/632f83a37bd30931b2e97ee8cbc1c1e7.png
  这样,右边显示的日志内容的属性就带了message和path
http://i2.运维网.com/images/blog/201808/21/7b25100cff372549f40d908a63f79744.png
点击右边日志内容属性后面隐藏的 "/var/log/nginx/access_json.log"
codec => "json"
}
}
output {
stdout {
codec => "rubydebug"
}
}
启动日志收集程序:
# /opt/logstash/bin/logstash -f json.conf      #或加个&放在后台执行
  访问nginx页面(在elk-node1的宿主机上执行访问页面的命令:curl http://192.168.1.160)就会出现以下内容:

# /opt/logstash/bin/logstash -f json.conf
Settings: Default filter workers: 1
Logstash startup completed
{
"@timestamp" => "2016-11-11T11:10:53.000Z",
"@version" => "1",
"client" => "192.168.1.7",
"url" => "/index.html",
"status" => "200",
"domain" => "192.168.1.160",
"host" => "192.168.1.160",
"size" => 3700,
"responsetime" => 0.0,
"referer" => "-",
"ua" => "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2",
"path" => "/var/log/nginx/access_json.log"
}
  注意:
  上面的json.conf配置只是将nginx日志输出,还没有输入到elasticsearch里,所以这个时候在elasticsearch界面里是采集不到nginx日志的。
  需要配置一下,将nginx日志输入到elasticsearch中,将其汇总到总文件file.conf里,如下也将nginx-log日志输入到elasticserach里:(后续就可以只用这个汇总文件,把要追加的日志汇总到这个总文件里即可)

# cat file.conf
input {
file {
path => "/var/log/messages"
type => "system"
start_position => "beginning"
}
file {
path => "/var/log/elasticsearch/huanqiu.log"
type => "es-error"
start_position => "beginning"
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
file {
path => "/var/log/nginx/access_json.log"
codec => json
start_position => "beginning"
type => "nginx-log"
}
}
output {
if == "system"{
elasticsearch {
hosts => ["192.168.1.160:9200"]
index => "system-%{+YYYY.MM.dd}"
}
}
if == "es-error"{
elasticsearch {
hosts => ["192.168.1.160:9200"]
index => "es-error-%{+YYYY.MM.dd}"
}
}
if == "nginx-log"{
elasticsearch {
hosts => ["192.168.1.160:9200"]
index => "nignx-log-%{+YYYY.MM.dd}"
}
}
}
  可以加上--configtest参数,测试下配置文件是否有语法错误或配置不当的地方,这个很重要!!

# /opt/logstash/bin/logstash -f file.conf --configtest
Configuration OK
  然后接着执行logstash命令(由于上面已经将这个执行命令放到了后台,所以这里其实不用执行,也可以先kill之前的,再放后台执行),然后可以再访问nginx界面测试下

# /opt/logstash/bin/logstash -f file.conf &
  登陆elasticsearch界面查看:
  将nginx日志整合到kibana界面里,如下:
http://i2.运维网.com/images/blog/201808/21/fa33ba8c53dcb2cef57020b820a34757.png
  5)收集系统日志
  编写收集文件并执行。

# cat syslog.conf
input {
syslog {
type => "system-syslog"
host => "192.168.1.160"
port => "514"
}
}
output {
stdout {
codec => "rubydebug"
}
}
  对上面的采集文件进行执行:

# /opt/logstash/bin/logstash -f syslog.conf
  重新开启一个窗口,查看服务是否启动:

# netstat -ntlp|grep 514
tcp6 0 0 192.168.1.160:514 :::* LISTEN 17842/java
# vim /etc/rsyslog.conf
#*.* @@remote-host:514                                                         【在此行下面添加如下内容】
*.* @@192.168.1.160:514
# systemctl restart rsyslog
  回到原来的窗口(即上面采集文件的执行终端),就会出现数据:

# /opt/logstash/bin/logstash -f syslog.conf
Settings: Default filter workers: 1
Logstash startup completed
{
"message" => "Stopping System Logging Service...\n",
"@version" => "1",
"@timestamp" => "2016-11-13T10:35:30.000Z",
"type" => "system-syslog",
"host" => "192.168.1.160",
"priority" => 30,
"timestamp" => "Nov 13 18:35:30",
"logsource" => "elk-node1",
"program" => "systemd",
"severity" => 6,
"facility" => 3,
"facility_label" => "system",
"severity_label" => "Informational"
}
  再次添加到总文件file.conf中:

# cat file.conf
input {
file {
path => "/var/log/messages"
type => "system"
start_position => "beginning"
}
file {
path => "/var/log/elasticsearch/huanqiu.log"
type => "es-error"
start_position => "beginning"
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
file {
path => "/var/log/nginx/access_json.log"
codec => json
start_position => "beginning"
type => "nginx-log"
}
syslog {
type => "system-syslog"
host => "192.168.1.160"
port => "514"
}
}
output {
if == "system"{
elasticsearch {
hosts => ["192.168.1.160:9200"]
index => "system-%{+YYYY.MM.dd}"
}
}
if == "es-error"{
elasticsearch {
hosts => ["192.168.1.160:9200"]
index => "es-error-%{+YYYY.MM.dd}"
}
}
if == "nginx-log"{
elasticsearch {
hosts => ["192.168.1.160:9200"]
index => "nignx-log-%{+YYYY.MM.dd}"
}
}
if == "system-syslog"{
elasticsearch {
hosts => ["192.168.1.160:9200"]
index => "system-syslog-%{+YYYY.MM.dd}"
}
}
}
  执行总文件(先测试下总文件配置是否有误,然后先kill之前在后台启动的file.conf文件,再次执行):

# /opt/logstash/bin/logstash -f file.conf --configtest
Configuration OK
# /opt/logstash/bin/logstash -f file.conf &
  测试:
  向日志中添加数据,看elasticsearch和kibana的变化:

# logger "hehehehehehe1"
# logger "hehehehehehe2"
# logger "hehehehehehe3"
# logger "hehehehehehe4"
# logger "hehehehehehe5"
http://i2.运维网.com/images/blog/201808/21/81a7692fdd7e3d033b083a5e98e5667e.png
  添加到kibana界面中:
http://i2.运维网.com/images/blog/201808/21/71fc6bc2db292e7e730c7084f6d73d90.png
http://i2.运维网.com/images/blog/201808/21/c4dea4aab1afa8127fccb6bac1535a43.png
  6)TCP日志的收集
  编写日志收集文件,并执行:(有需要的话,可以将下面收集文件的配置汇总到上面的总文件file.conf里,进而输入到elasticsearch界面里和kibana里查看)

# cat tcp.conf
input {
tcp {
host => "192.168.1.160"
port => "6666"
}
}
output {
stdout {
codec => "rubydebug"
}
}
# /opt/logstash/bin/logstash -f tcp.conf
  开启另外一个窗口,测试一(安装nc命令:yum install -y nc):

# nc 192.168.1.160 6666"",
"@version" => "1",
"@timestamp" => "2016-11-13T11:01:15.280Z",
"host" => "192.168.1.160",
"port" => 49743
}
  测试二:

# echo "hehe" | nc 192.168.1.160 6666
# echo "hehe" > /dev/tcp/192.168.1.160/6666
  回到之前的执行端口,在去查看,就会显示出来:

# /opt/logstash/bin/logstash -f tcp.conf
Settings: Default filter workers: 1
Logstash startup completed.......
{
"message" => "hehe",
"@version" => "1",
"@timestamp" => "2016-11-13T11:39:58.263Z",
"host" => "192.168.1.160",
"port" => 53432
}
{
"message" => "hehe",
"@version" => "1",
"@timestamp" => "2016-11-13T11:40:13.458Z",
"host" => "192.168.1.160",
"port" => 53457
}
  7)使用filter
  编写文件:

# cat grok.conf
input {
stdin{}
}
filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
output {
stdout{
codec => "rubydebug"
}
}
  执行检测:

# /opt/logstash/bin/logstash -f grok.conf
Settings: Default filter workers: 1
Logstash startup completed
55.3.244.1 GET /index.html 15824 0.043    #输入这个,下面就会自动形成字典的形式
{
"message" => "55.3.244.1 GET /index.html 15824 0.043",
"@version" => "1",
"@timestamp" => "2016-11-13T11:45:47.882Z",
"host" => "elk-node1",
"client" => "55.3.244.1",
"method" => "GET",
"request" => "/index.html",
"bytes" => "15824",
"duration" => "0.043"
}
  其实上面使用的那些变量在程序中都有定义:

# cd /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.2/patterns/
# ls
aws   bro   firewalls      haproxyjunos         mcollective         mongodbpostgresqlredis
baculaeximgrok-patternsjava   linux-syslogmcollective-patternsnagios   rails       ruby
# cat grok-patterns
filter {
# drop sleep events
grok {
match => { "message" =>"SELECT SLEEP" }
add_tag => [ "sleep_drop" ]
tag_on_failure => [] # prevent default _grokparsefailure tag on real records
}
if "sleep_drop" in {
drop {}
}
grok {
match => [ "message", "(?m)^# User@Host: %{USER:user}
[^
[^
]+\] @ (?:(?\S*) )?
(?:
(?:
\s+Id: %{NUMBER:row_id:int}\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?(?\w+)\s+.*)\n#\s*" ]
}
date {
match => [ "timestamp", "UNIX" ]
remove_field => [ "timestamp" ]
}
}
  8)mysql慢查询
  收集文件:

# cat mysql-slow.conf
input {
file {
path => "/root/slow.log"
type => "mysql-slowlog"
codec => multiline {
pattern => "^# User@Host"
negate => true
what => "previous"
}
}
}
filter {
# drop sleep events
grok {
match => { "message" =>"SELECT SLEEP" }
add_tag => [ "sleep_drop" ]
tag_on_failure => [] # prevent default _grokparsefailure tag on real records
}
if "sleep_drop" in {
drop {}
}
grok {
match => [ "message", "(?m)^# User@Host: %{USER:user}
[^
[^
]+\] @ (?:(?\S*) )?
(?:
(?:
\s+Id: %{NUMBER:row_id:int}\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?(?\w+)\s+.*)\n#\s*" ]
}
date {
match => [ "timestamp", "UNIX" ]
remove_field => [ "timestamp" ]
}
}
output {
stdout {
codec =>"rubydebug"
}
}
  执行检测:
  上面需要的/root/slow.log是自己上传的,然后自己插入数据保存后,会显示:

# /opt/logstash/bin/logstash -f mysql-slow.conf
Settings: Default filter workers: 1
Logstash startup completed
{
"@timestamp" => "2016-11-14T06:53:54.100Z",
"message" => "# Time: 161114 11:05:18",
"@version" => "1",
"path" => "/root/slow.log",
"host" => "elk-node1",
"type" => "mysql-slowlog",
"tags" => [
"_grokparsefailure"
]
}
{
"@timestamp" => "2016-11-14T06:53:54.105Z",
"message" => "# User@Host: test @\n# Query_time: 1.725889Lock_time: 0.000430 Rows_sent: 0Rows_examined: 0\nuse test_zh_o2o_db;\nSET timestamp=1479092718;\nSELECT trigger_name, event_manipulation, event_object_table, action_statement, action_timing, DEFINER FROM information_schema.triggers WHERE BINARY event_object_schema='test_zh_o2o_db' AND BINARY event_object_table='customer';\n# Time: 161114 12:10:30",
"@version" => "1",
"tags" => [
"multiline",
"_grokparsefailure"
],
"path" => "/root/slow.log",
"host" => "elk-node1",
"type" => "mysql-slowlog"
}
  接下来描述会遇见到的一个问题:
一旦我们的elasticsearch出现问题,就不能进行日志采集处理了!
这种情况下该怎么办呢?
  解决方案;
可以在client和elasticsearch之间添加一个中间件作为缓存,先将采集到的日志内容写到中间件上,然后再从中间件输入到elasticsearch中。
这就完美的解决了上述的问题了。
  (4)ELK中使用redis作为中间件,缓存日志采集内容
  1)redis的配置和启动

# vim /etc/redis.conf               #修改下面两行内容
daemonize yes
bind 192.168.1.160
# systemctl start redis
# lsof -i:6379
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
redis-ser 19474 redis 4u IPv4 1344465 0t0 TCP elk-node1:6379 (LISTEN)
# redis-cli -h 192.168.1.160
192.168.1.160:6379> info
# Server
redis_version:2.8.19
.......
  2)编写从Client端收集数据的文件

# vim redis-out.conf
input {
stdin {}
}
output {
redis {
host => "192.168.1.160"
port => "6379"
db => "6"
data_type => "list"
key => "demo"
}
}
  3)执行收集数据的文件,并输入数据hello redis

# /opt/logstash/bin/logstash -f redis-out.conf
Settings: Default filter workers: 1
Logstash startup completed             #下面输入数据hello redis
hello redis
  4)在redis中查看数据

# redis-cli -h 192.168.1.160
192.168.1.160:6379> info
# Server
.......
.......
# Keyspace
db6:keys=1,expires=0,avg_ttl=0   #在最下面一行,显示是db6

192.168.1.160:6379> select 6
OK
192.168.1.160:6379> keys *
1) "demo"
192.168.1.160:6379> LINDEX demo -1
"{\"message\":\"hello redis\",\"@version\":\"1\",\"@timestamp\":\"2016-11-14T08:04:25.981Z\",\"host\":\"elk-node1\"}"
  5)继续随便写点数据

# /opt/logstash/bin/logstash -f redis-out.conf
Settings: Default filter workers: 1
Logstash startup completed
hello redis
123456
asdf
ert
wang
shi
bo
guohuihui
as
we
r
g
asdfjkdfsak
5423wer
34rt3
6y
7uj
u
io9
sdjfhsdk890
huanqiu
huanqiuchain
hqsb
asda
  6)在redis中查看
  在redis中查看长度:

# redis-cli -h 192.168.1.160
192.168.1.160:6379> info
# Server
redis_version:2.8.19
.......
.......
# Keyspace
db6:keys=1,expires=0,avg_ttl=0      #显示是db6
192.168.1.160:6379> select 6
OK
192.168.1.160:6379> keys *
1) "demo"
192.168.1.160:6379> LLEN demo
(integer) 24
  7)将redis中的内容写到ES中

# vim redis-in.conf
input {
redis {
host => "192.168.1.160"
port => "6379"
db => "6"
data_type => "list"
key => "demo"
}
}
output {
elasticsearch {
hosts => ["192.168.1.160:9200"]
index => "redis-in-%{+YYYY.MM.dd}"
}
}
  执行:

# /opt/logstash/bin/logstash -f redis-in.conf --configtest
Configuration OK
# /opt/logstash/bin/logstash -f redis-in.conf &
  在redis中查看,发现数据已被读出:
192.168.1.160:6379> LLEN demo
(integer) 0
  登陆elasticsearch界面查看:
http://i2.运维网.com/images/blog/201808/21/b47d860904d21018704decea6ded8139.png
  8)接着,将收集到的所有日志写入到redis中。这了重新定义一个添加redis缓存后的总文件shipper.conf。(可以将之前执行的总文件file.conf停掉)

# vim shipper.conf
input {
file {
path => "/var/log/messages"
type => "system"
start_position => "beginning"
}
file {
path => "/var/log/elasticsearch/huanqiu.log"
type => "es-error"
start_position => "beginning"
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
file {
path => "/var/log/nginx/access_json.log"
codec => json
start_position => "beginning"
type => "nginx-log"
}
syslog {
type => "system-syslog"
host => "192.168.1.160"
port => "514"
}
}
output {
if == "system"{
redis {
host => "192.168.1.160"
port => "6379"
db => "6"
data_type => "list"
key => "system"
}
}
if == "es-error"{
redis {
host => "192.168.1.160"
port => "6379"
db => "6"
data_type => "list"
key => "demo"
}
}
if == "nginx-log"{   
redis {
host => "192.168.1.160"
port => "6379"
db => "6"
data_type => "list"
key => "nginx-log"
}
}
if == "system-syslog"{
redis {
host => "192.168.1.160"
port => "6379"
db => "6"
data_type => "list"
key => "system-syslog"
}   
}
}
  执行上面的文件(提前将上面之前启动的file.conf文件的执行给结束掉!)

# /opt/logstash/bin/logstash -f shipper.conf --configtest
Configuration OK
# /opt/logstash/bin/logstash -f shipper.conf
Settings: Default filter workers: 1
Logstash startup completed
  在redis中查看:

# redis-cli -h 192.168.1.160
192.168.1.160:6379> info
# Server
redis_version:2.8.19
.......
.......
# Keyspace
db6:keys=1,expires=0,avg_ttl=0                      #显示是db6
192.168.1.160:6379> select 6
OK
192.168.1.160:6379> keys *
1) "demo"
2) "system"
192.168.1.160:6379> keys *
1) "nginx-log"
2) "demo"
3) "system"
  另开一个窗口,添加点日志:

# logger "12325423"
# logger "12325423"
# logger "12325423"
# logger "12325423"
# logger "12325423"
# logger "12325423"
  又会增加日志:

192.168.1.160:6379> keys *
1) "system-syslog"
2) "nginx-log"
3) "demo"
4) "system"
  其实可以在任意的一台ES中将数据从redis读取到ES中。
  下面咱们在elk-node2节点,将数据从redis读取到ES中:
  编写文件:

# cat file.conf
input {
redis {
type => "system"
host => "192.168.1.160"
port => "6379"
db => "6"
data_type => "list"
key => "system"
}
redis {
type => "es-error"
host => "192.168.1.160"
port => "6379"
db => "6"
data_type => "list"
key => "es-error"
}
redis {
type => "nginx-log"
host => "192.168.1.160"
port => "6379"
db => "6"
data_type => "list"
key => "nginx-log"
}
redis {
type => "system-syslog"
host => "192.168.1.160"
port => "6379"
db => "6"
data_type => "list"
key => "system-syslog"
}   
}
output {
if == "system"{
elasticsearch {
hosts => ["192.168.1.160:9200"]
index => "system-%{+YYYY.MM.dd}"
}
}
if == "es-error"{
elasticsearch {
hosts => ["192.168.1.160:9200"]
index => "es-error-%{+YYYY.MM.dd}"
}
}
if == "nginx-log"{
elasticsearch {
hosts => ["192.168.1.160:9200"]
index => "nignx-log-%{+YYYY.MM.dd}"
}
}
if == "system-syslog"{
elasticsearch {
hosts => ["192.168.1.160:9200"]
index => "system-syslog-%{+YYYY.MM.dd}"
}
}
}
  执行:

# /opt/logstash/bin/logstash -f file.conf --configtest
Configuration OK
# /opt/logstash/bin/logstash -f file.conf &
  去redis中检查,发现数据已经被读出到elasticsearch中了。

192.168.1.160:6379> keys *
(empty list or set)
  同时登陆logstash和kibana看,发现可以正常收集到日志了。
  可以执行这个 去查看nginx日志

# ab -n10000 -c1 http://192.168.1.160/
  也可以启动多个redis写到ES中,具体根据自己的实际情况而定。
  以上即是本人部署ELK环境的整个操作记录,在此留笔,希望能帮助到一些朋友。



页: [1]
查看完整版本: 在Centos7上部署ELK实时日志分析平台