|
Logstash
#############################logstash命令常用参数#############################
-n 指定logstash实例名称,如果没有指定,默认是本地主机名
-f 从指定的文件或文件夹中加载logstash的配置;如果是一个文件夹,它会加载里面所有的文件,或者可以加上通配符只加载特定格式的文件
-w 允许filter和output的pipeline线程数量,默认是CPU核数
-b 每个 Logstash pipeline 线程,在执行具体的 filter 和 output 函数之前,最多能累积的日志条数,默认是 125 条。越大性能越好,同样也会消耗越多的 JVM 内存
-u 每个 Logstash pipeline 线程,在打包批量日志的时候,最多等待几毫秒。默认是 5 ms
-l 指定日志输出位置
-r 监控配置配置文件,如果有变化则自动重载logstash
-e 使用给定的命令行字符串作为配置,直接运行bin/log/logstash -e 配置默认是"input { stdin { type => stdin } }" "output { stdout { codec => rubydebug } }"
-t 检查配置文件是否有语法错误
-V 打印logstash版本
--log.level 指定日志等级,包括trace、debug、info、warn、error、fatal,默认info
--http.host 指定web API绑定的主机,默认127.0.0.1
--http.port 指定web API的http端口,默认9600至9700
#############################后台启动logstash#############################
1.安装supervisor
yum -y install supervisor --enablerepo=epel 2.在/etc/supervisord.conf配置文件最后添加:
[program:logstash]
directory=/usr/local/logstash
command=/usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/conf.d/ -r -l /var/log/logstash 3.启动
service supervisord start 4.单独控制一个子进程
supervisorctl stop logstash #############################编解码配置插件#############################
#官网参考:https://www.elastic.co/guide/en/logstash/current/codec-plugins.html
#logstash不只是一个input|filter|output的数据流,而是一个input|decode|filter|encode|output的数据流。codec就是用来decode、encode事件的。
1.JSON编解码
#直接输入预定义好的JSON数据,可以省略filter/grok配置,降低logstash过滤的CPU负载消耗
input {
file {
path => "/opt/data/logs/bjape01-ngx-a1-172.16.3.2/nginx_access.log"
codec => "json"
}
} 2.multiline多行事件编码
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
} #pattern 要匹配的正则表达式,字符串类型
#negate 正则表达式是否生效,布尔类型,默认为flase
#what 未匹配的内容是向前合并还是向后后合并,previous,next两个值选择
#示例配置可以用于Log4j方式的应用程序日志
3.line面向行的文本数据
4.plain空的纯文本编解码
#使用已经有定义框架输入或输出,例如logstash输出转存到elasticsearch
output {
if [type] == "user_audit" {
elasticsearch {
hosts => ["172.16.1.25","172.16.1.26","172.16.1.27"]
index => 'user_audit-%{+YYYY-MM-dd}'
codec=>plain{charset=>"UTF-8"}
}
}
}
#############################输入插件#############################
#官网参考:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
1.标准输入
示例:
input {
stdin {
add_field => {"key" => "value"}
codec => "plain"
tags => ["add"]
type => "std"
}
} #add_field 向事件添加一个字段,hash类型
#codec 设置编码方式,默认是line
#tags 添加标记
#type 添加类型
2.文件输入
input {
file {
path => [
"/opt/data/logs/idca-web1-172.16.3.2/apache_access.log",
"/opt/data/logs/idca-web1-172.16.3.2/apache_error.log"
]
stat_interval => 1
discover_interval => 1
type => "apache_log"
}
} #path 处理的文件的路径, 可以定义多个路径
#stat_interval 每隔多久检查一次被监听文件状态(是否更新),默认是1秒
#discover_interval 每隔多久去检查一次被监听的path下是否有新文件,默认是15秒
#start_position 从什么位置开始读取文件数据,"beginning"从头开始读取,读到最后一行不会终止,继续tailf;"end"从结束位置tailf。默认值是"end"
#坑:start_position仅在该文件从未被监听过的时候起作用,如果sincedb文件中已经有这个文件的inode记录了,那么logstash依然会冲记录过的pos开始读取数据。所以重复测试的时候每回需要删除sincedb文件,该文件一般在安装目录下的data/plugins/inputs/file/中
#优化:在file中添加sincedb_path => "/dev/null",可以直接将sincedb写到黑洞中
#exclude 不想被监听的文件可以排除出去
#sincedb_path 配置sincedb文件位置
#sincedb_write_interval 每隔多久写一次sincedb文件,默认15秒
3.syslog输入
input {
syslog {
port => "514"
}
}
4.collectd输入
#############################过滤器配置#############################
#官网参考:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
1.grok正则捕获
#用于匹配对应格式的message,然后过滤出来
filter {
grok {
break_on_match => false
patterns_dir => ["/usr/local/logstash/etc/conf.d/patterns"]
match => { "message" => "\[%{TIMESTAMP_ISO8601:time}\] \[%{GLASSFISHVERSION:gfversion}\] %{OTHER}"}
}
grok {
break_on_match => false
patterns_dir => ["/usr/local/logstash/etc/conf.d/patterns"]
match => {"path" => "/%{USER}/%{USER}/%{USER}/%{USER:host}/%{OTHER}" }
overwrite => [ "host" ]
}
} #break_on_match 表示匹配即停止,默认为true,如果希望grok尝试所有模式,设置为false
#patterns_dir 将grok表达式统一写到文件中,该选项指定文件目录。grok表达式可以写到配置文件中,但是日志格式可能有多种,每种都写一行自己的表达式就可能会出问题,所以建议将所有的grok表达式统一写到一个地方。
#grok表达式格式:
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
#第一行用普通的正则表达式来定义一个grok表达式;第二行用以定义好的grok表达式来定义另一个grok表达式。logstash内置了多种grok表达式,查看https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns,同样可以自定义grok表达式。%{NUMBER:num}
%{NUMBER:num:int}
#grok表达式的打印复制格式,第一行中NUMBER表示匹配的模式,num是匹配内容的标识,代表匹配的内容;第二行中int表示转换匹配内容的格式,默认是字符串格式。 #overwrite用来重写message字段,上面例子第二个grok中match的是一个路径,但是我们只想保留路径中的一个host字段,所以overwrite => [ "host" ]就可以了
2.date时间处理
#日志产生到logstash处理一般都会有一段时间差,导致logstash实际处理时间都比日志产生时间晚。logstash-filter-date插件可以解析日志中的时间,变成LogStash:Timestamp对象,然后转存到@timestamp字段里,作为事件的时间戳;如果未使用date插件或没有解析到时间,logstash将使用日志输入的时间作为事件的时间戳。
date {
match => [ "time", "MMM dd yyyy HH:mm:ss",
"MMM d yyyy, HH:mm:ss", "ISO8601" ]
} #配置选项包括:locale、match、tag_on_failure、target、timezone。常用的是match,值的类型同样是列表,默认是[]
#实例中的time就是grok中匹配字段的标识,由我们自定义。
#date插件支持五种时间格式,常用的是ISO8601和Joda-Time库
ISO8601:类似"2018-10-19T14:25:25.905+0800",这是我们的北京时间,如果是UTC时间后面跟的是一个大写的Z(时区偏移),nginx、apache、java应用都可以设置为这种格式。
UNIX:UNIX时间戳格式,记录从1970年起至今的总秒数。
UNIX_MS:从1970年至今的总毫秒数。
TAI64N:tai64n格式,很少用。
Joda-Time库:y年,yyyy例如2018,yy例如18;M月,MMMM例如January,MMM例如Jan,MM例如01,M例如1;d日,dd例如01,d例如1;H时,HH例如01,H例如1;m分,mm例如01,m例如1;s秒,ss例如01,s例如1 3.GeoIP地址查询
#GeoIP过滤器根据来自Maxmind GeoLite2数据库的数据,添加关于IP地址地理位置的信息。
#插件名称logstash-filter-geoip,可以先使用logstash-plugin list命令查看插件是否安装,没有安装可以使用logstash-plugin install logstash-filter-geoip命令来安装
#基本配置
geoip {
source => "clientip"
} #debug标准输出为
{
"path" => "/var/log/logstash_test/apache_access.log",
"type" => "apache_log",
"timestamp" => "26/Oct/2018:14:44:25 +0800",
"@version" => "1",
"response" => "302",
"usetime" => "0",
"referrer" => "/hsbp2pUser/view/html/errorSkip.html;jsessionid=f1deac55291cdf60c1e66108a89c.bjape01-wgw-b2",
"@timestamp" => 2018-10-26T06:44:25.000Z,
"geoip" => {
"country_code2" => "CN",
"timezone" => "Asia/Shanghai",
"latitude" => 22.8167,
"continent_code" => "AS",
"region_code" => "45",
"region_name" => "Guangxi",
"country_name" => "China",
"location" => {
"lon" => 108.3167,
"lat" => 22.8167
},
"country_code3" => "CN",
"ip" => "113.15.241.43",
"longitude" => 108.3167,
"city_name" => "Nanning"
},
"agent" => "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)",
"host" => "opsmanage",
"message" => "113.15.241.43 - - [26/Oct/2018:14:44:25 +0800] \"/hsbp2pUser/view/html/errorSkip.html;jsessionid=f1deac55291cdf60c1e66108a89c.bjape01-wgw-b2\" 302 460 0 \"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)\"",
"clientip" => "113.15.241.43",
"bytes" => "460",
"remote_user" => "-"
} #常用配置
#source 指定来源IP字段,即使用GeoIP库来分析的IP地址,如果是一个列表,只有第一个会被使用
#database 指定GeoIP库的路径,如果不指定该配置项默认随Logstash的GeoLite2城市数据库
#fields 指定输出那些字段,格式是一个列表。如果不指定默认为[city_name,continent_code,country_code2,country_code3,country_name,dma_code,ip,latitude,longitude,postal_code,region_name,timezone]
#target 指定应该将geoip数据存储在哪个Logstash的字段,一般设置为geoip
#############################输出插件#############################
#官网参考:https://www.elastic.co/guide/en/logstash/current/output-plugins.html
1.输出到Elasticsearch
output {
elasticsearch {
hosts => ["172.16.1.25","172.16.1.26","172.16.1.27"]
index => "%{type}-%{+YYYY-MM-dd}"
codec=>plain{charset=>"UTF-8"}
}
} #hosts Elasticsearch地址,列表形式,可以写多个。
#index 索引名,写入Elasticsearch索引的名称,可以使用变量。注意:%{+YYYY-MM-dd}}在语法解析时,看到已+开头的,就会自动认为后面是时间格式,所以之前处理过程中不要给自定义字段取加号开头的名字;索引名中不要有大写字母,否则Elasticsearch会报InvalidindexNameException
|
|