96818 发表于 2019-1-28 08:46:44

基于AWS

前言

  运维故障排障速度往往与监控系统体系颗粒度成正比,监控到位才能快速排障

  在部署这套系统之前,平台所有系统日志都由Graylog+Zabbix,针对日志出现的错误关键字进行告警,这种做法在运维工作开展过程中暴露出多个不足点,不详述;在考虑多方面原因后,最终对日志告警系统进行更换,选用的方案是:ELK + Kafka+Filebeat + Elastalert
  本文主要以两个需求为主轴做介绍


[*]非工作时间服务器异常登录告警
[*]系统日志出现错误关键字告警

架构
http://i2.运维网.com/images/blog/201804/29/320194963748aab9d41dc2fc90c48d16.jpg

服务选型




name
version
info




Amazon Elasticsearch Service
v6.2
AWK官网部署教程


Logstash
v6.2.3
选用与ES相同版本


Filebeat
v6.2.3
选用与ES相同版本


Confluent(Kafka)
v4.0
这里推荐 Confluent 的版本,Confluent 是 kafka 作者 Neha Narkhede 从 Linkedin 出来之后联合 LinkedIn 前员工创建的大数据公司,专注于 kafka 的企业应用。


Elastalert
v0.1.29
原先考虑采用X-Pack但由于AWS目前还不支持



部署

  本文采用的操作系统 :CentOS release 6.6

  Filebeat

# 下载源
$ curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.3-x86_64.rpm
# 安装
$ sudo rpm -vi filebeat-6.2.3-x86_64.rpm
  Logstash

# 导入Yum源
$ rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
$ cat"kafkaNode:9092"
consumer_threads => 3
topics => ["system_log"]
auto_offset_reset => "latest"
codec => "json"
}
}
filter {
# 排除logstash日志
if == "/var/log/logstash-stdout.log" {
drop {}
}
if == "system_log" {
date {match => [ "", "MMMd HH:mm:ss", "MMM dd HH:mm:ss" ]}
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:} %{SYSLOGHOST:} %{DATA:}(?:\[%{POSINT:}\])?: %{GREEDYMULTILINE:}"] }
pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
remove_field => "message"
}
}
}
output {
elasticsearch {
hosts => [""]
index => "%{}_%{+YYYYMMdd}"
document_type => "%{[@metadata]}"
}
}

  /etc/logstash/conf.d/secure_log.conf

input {
kafka {
bootstrap_servers => "kafkaNode:9092"
consumer_threads => 3
topics => ["system_secure"]
auto_offset_reset => "latest"
codec => "json"
}
}
filter {
if == "system_secure" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:} %{SYSLOGHOST:} sshd(?:\[%{POSINT:}\])?: %{DATA:} %{DATA:} for (invalid user )?%{DATA:} from %{IPORHOST:} port %{NUMBER:} ssh2(: %{GREEDYDATA:})?",
"%{SYSLOGTIMESTAMP:} %{SYSLOGHOST:} sshd(?:\[%{POSINT:}\])?: %{DATA:} user %{DATA:} from %{IPORHOST:}",
"%{SYSLOGTIMESTAMP:} %{SYSLOGHOST:} sshd(?:\[%{POSINT:}\])?: Did not receive identification string from %{IPORHOST:}",
"%{SYSLOGTIMESTAMP:} %{SYSLOGHOST:} sudo(?:\[%{POSINT:}\])?: \s*%{DATA:} :( %{DATA:} ;)? TTY=%{DATA:} ; PWD=%{DATA:} ; USER=%{DATA:} ; COMMAND=%{GREEDYDATA:}",
"%{SYSLOGTIMESTAMP:} %{SYSLOGHOST:} groupadd(?:\[%{POSINT:}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
"%{SYSLOGTIMESTAMP:} %{SYSLOGHOST:} useradd(?:\[%{POSINT:}\])?: new user: name=%{DATA:}, UID=%{NUMBER:}, GID=%{NUMBER:}, home=%{DATA:}, shell=%{DATA:}$",
"%{SYSLOGTIMESTAMP:} %{SYSLOGHOST:} %{DATA:}(?:\[%{POSINT:}\])?: %{GREEDYMULTILINE:}"] }
pattern_definitions => {"GREEDYMULTILINE"=> "(.|\n)*"}
remove_field => "message"
}
}
}
output {
elasticsearch {
hosts => [""]
index => "%{}_%{+YYYYMMdd}"
document_type => "%{[@metadata]}"
}
}

  Kafka


# 导入
rpm --import https://packages.confluent.io/rpm/4.0/archive.key
cattime_start and \
login_time < time_end else True
# 合并两种类型payload
data = self.post_static_payload
data.update(payload)
# 发送告警
if self.post_lock:
myRequests = requests.Session()
myRequests.post(url=self.post_url,data=data,verify=False)
elastalert_logger.info("[-] eagle alert sent.")
else:
elastalert_logger.info("
[*] nothing to do.")
def get_info(self):
return {'type': 'http_post'}
  type
  在使用blaklist过程发现改类型是全匹配,为了方便编写配置文件,所以对其做了简单修改
  elastalert/ruletypes.py

# 新增
class BlacklistV2Rule(CompareRule):
required_options = frozenset(['compare_key', 'blacklist_v2'])
def __init__(self, rules, args=None):
super(BlacklistV2Rule, self).__init__(rules, args=None)
self.expand_entries('blacklist_v2')
def compare(self, event):
term = lookup_es_key(event, self.rules['compare_key'])
# 循环配置文件, 这种做法对性能有一定的损耗,在没找到更合适的解决方案前,就采取这种方式
for i in self.rules['blacklist_v2']:
if i in term:
return True
return False
  elastalert/config.py

# 新增
rules_mapping = {
'frequency': ruletypes.FrequencyRule,
'any': ruletypes.AnyRule,
'spike': ruletypes.SpikeRule,
'blacklist': ruletypes.BlacklistRule,
'blacklist_v2': ruletypes.BlacklistV2Rule,
'whitelist': ruletypes.WhitelistRule,
'change': ruletypes.ChangeRule,
'flatline': ruletypes.FlatlineRule,
'new_term': ruletypes.NewTermsRule,
'cardinality': ruletypes.CardinalityRule,
'metric_aggregation': ruletypes.MetricAggregationRule,
'percentage_match': ruletypes.PercentageMatchRule,
}

  elastalert/schema.yaml

# 新增
- title: BlacklistV2
required:
properties:
type: {enum: }
compare_key: {'items': {'type': 'string'},'type': ['string', 'array']}
blacklist: {type: array, items: {type: string}}

打进Docker
  做了个简单DockerFile做参考

FROM python:2.7-alpine
ENV SITE_PACKAGES /usr/local/lib/python2.7/site-packages/elastalert
WORKDIR /opt/elastalert
RUN apk update &&   apk add gcc ca-certificates openssl-dev openssl libffi-devgcc musl-dev tzdata openntpd && \
pip install elastalert && cp -rf /usr/share/zoneinfo/Asia/Taipei /etc/localtime
COPY ./ /opt/elastalert
CMD ["/opt/elastalert/start.sh"]
  start.sh

#!/bin/sh
SITE_PATH=/usr/local/lib/python2.7/site-packages/elastalert
CONFIG=/opt/elastalert/config/config.yaml
MODULES=/opt/elastalert/modules
if [ -n "${MODULES}" ]
then
\cp -rf ${MODULES}${SITE_PATH}
echo "[-] Copy ${MODULES} to ${SITE_PATH}"
fi
\cp -rf elastalert/* ${SITE_PATH}/
echo "[-] Copy elastalert/* to ${SITE_PATH}"
python -m elastalert.elastalert --verbose--config ${CONFIG}
  基础工作准备就绪,加入Bee容器管理平台完成自动构建。

实现效果
http://i2.运维网.com/images/blog/201804/29/6ac1af5db4b53d281b3844b63c89cf28.jpg

碰到的坑
  Zookeeper
  问题描述

  老版Kafaka依赖Zookeeper,默认安装时注册地址为:localhost,导致问题的现象:

  filebeat错误日志

2018-04-25T09:14:55.590+0800    INFO    kafka/log.go:36    client/metadata fetching metadata for [[ kafkaNode:9092]] from broker %!s(MISSING)
2018-04-25T09:14:55.591+0800    INFO    kafka/log.go:36    producer/broker/[] starting up
2018-04-25T09:14:55.591+0800    INFO    kafka/log.go:36    producer/broker/[] state change to on %!s(MISSING)/%!d(MISSING)
2018-04-25T09:14:55.591+0800    INFO    kafka/log.go:36    producer/leader/[]/%!d(MISSING) selected broker %!d(MISSING)
2018-04-25T09:14:55.591+0800    INFO    kafka/log.go:36    producer/leader/[]/%!d(MISSING) state change to
2018-04-25T09:14:55.591+0800    INFO    kafka/log.go:36    producer/leader/[]/%!d(MISSING) abandoning broker %!d(MISSING)
2018-04-25T09:14:55.592+0800    INFO    kafka/log.go:36    producer/broker/[] shut down
2018-04-25T09:14:55.592+0800    INFO    kafka/log.go:36    Failed to connect to broker [:9092: getsockopt: connection refused]]: %!s(MISSING)

  日志出现两个地址,一个是kafka地址,另外出现一个localhost地址。
这是因为filebeat已经跟kafaka建立了连接,但是从kafaka到zookeeper这一段找不到

  解决方法


# get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://localhost:9092"],"jmx_port":-1,"host":"localhost","timestamp":"1523429158364","port":9092,"version":4}
cZxid = 0x1d
ctime = Wed Apr 11 14:45:58 CST 2018
mZxid = 0x1d
mtime = Wed Apr 11 14:45:58 CST 2018
pZxid = 0x1d
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x162b374170d0000
dataLength = 188
numChildren = 0
# 发现注册地址是localhost,修改之
set /brokers/ids/0{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafkaNode:9092"],"jmx_port":9999,"host":"kafkaNode","timestamp":"1523429158364","port":9092,"version":4}
  修改完重启,问题解决。

  博文链接:http://a-cat.cn/2018/04/29/aws-elk/




页: [1]
查看完整版本: 基于AWS