白森 发表于 2019-1-29 08:25:58

小米Elasticsearch 服务化实践

  转载自 :小米运维(公众号>

摘要

  使用 Elasticsearch 做日志检索、分析服务已成为当前互联网公司首选工具之一了。那么怎么与公司内部系统(如数据采集、数据 schema 管理、数据权限等系统)打通,怎么提供一套易用的内部 Elasticsearch 系统便成为了首先需要解决的问题。本文将介绍小米内部 Elasticsearch 服务化之路的演进。


数据链路图
  首先给大家介绍一下我米的数据链路图(老版)
http://i2.运维网.com/images/blog/201805/18/49e5a2c0eb8f73d7d86de0a661e623d6.png?x-oss-process=image/watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=
  简单划分了四层,数据采集、数据控制与传输、数据存储与消息队列、数据应用层。
  整个数据链路中,由数据工场负责源数据注册、schema 定义,权限控制。数据注册后会更新 scribe 白名单(刷新 scribe 配置),创建 hdfs 目录,hive 表创建、授权等。还有一个重要功能就是数据 schema 定义。包括 schema 序列化与反序列化协议,字段定义等等。业务方在数据工场注册好数据之后就可以通过 agent 打数据了。一般常用的方式就是 xlogger。xlogger 是我们内部开发的一个 java log sdk,支持 scribe 协议,能够通过 xlogger 直接往 scribe-agent 或者 scribe-server 打数据。scribe-server 根据数据工场定义的规则,将数据写入 hdfs 或者 kafka。然后数据应用层开始通过各种工具进行分析。

怎么将 elasticsearch 服务无缝对接到公司成熟的数据链路中?
  我们主要考虑以下几个因素


[*]Elasticsearch 输入源从哪里对接最合适?
[*]怎么结合数据工场定义的 schema 去做反序列化?
[*]怎么给数据做 ETL?
[*]怎么方便管理和监控?
  对与第 1 点,我们选择了对接 kafka 和 hdfs。其中 kafka 走实时数据摄入,hdfs 走离线数据摄入。
  综合 2、3、4 点,我们只有两种方案可选,一是在现有开源组件上做二次开发,与我们内部数据工场打通,能通过数据工场定义的 schema 结构去做反序列化个 ETL。二是自研类似 logstash 的中间件。
  最终我们选择了自研 kafka2es 组件(用 java 开发)。主要实现第 2 和第 3 点。当定位清楚了,那么实现起来,也不复杂。第一版做的非常简单,通过 pom 去把业务在数据工场定义的 schema 的反序列化类引入进来,实现一个通用的 ETL 类(也就是存粹把从 kafka 读出来的 byte 数组的反序列化出来,再转成 json 结构写入 elasticsearch),然后再留一个可配置的空间,让业务自己实现 ETL 逻辑,我们 kafka2es 在 elk 时通过动态加载的方式,调用业务实现的 ETL 类做处理。监控方面就是在各个环节加计数:read kafka 条数、write es 条数、write 失败条数、etl 失败条数,将点打入 falcon 做监控告警。

具体实现
  我们将读 kafka 和写 es 的逻辑全部封装好(这里不详细描述了,代码都非常简单),当业务接入的时候只需要准备一个配置文件和一个 ETL 类,提到我们 kafka2es 的项目中即可。如果不需要特殊处理的数据,则只需一份配置文件即可,样例如下:
  

server.conf  

  
kafka_topic_name=xxx                              #kafka topic 名称
  
kafka_zk_url=xxx                                    #kafka zk 地址
  
kafka_consumer_groupid=xxx                        #kafka 消费者 group>  
decoder_class=xxx                                 #数据分序列化类
  
es_url=xxx                                          #elasticsearch 集群地址
  
es_cluster_name=es-test                           #elasticsearch 集群名称
  
es_index_name=xxx                                 #写入索引名称
  
es_index_name_suffix_format=yyyy.MM.dd            #索引日期后缀规则,如按天建索引则是yyyy.MM.dd 按月建索引则是yyyy.MM
  
es_index_type_name=doc                              #索引type名,统一默认使用doc
  
es_authtoken=xxx                                    #权限token,通过账号密码加密得到
  
parser_class=com.xiaomi.data.CommonParser         #ETL类 一般为业务自行实现,主要用于数据清洗和过滤
  
thread_num=1                                        #处理线程数
  
es_index_pipeline=test-pipeline                     #pipeline,如nginx日志,apache日志通过pipeline处理更为简单,默认所有data节点都开了ingest功能,master节点和client节点都不开ingest功能
  

  以上配置分为三个部分:kafka 相关,elasticsearch 集群相关,ETL 相关。整个 kafka2es 项目也封装了 三个部分,写读 kafka,做 ETL,写 elasticsearch。
  ETL 类怎么抽象出来让业务自行实现?
  

public abstract>
/**
  * @param log byte[]
  * @return json string
  */
  
public abstractString parser(byte[] log);
  
}
  

  通过定义个一个抽象类 BaseParser,抽象方法 parser,业务 ETL 类只需要基础 BaseParser,实现 parser,parser 方法的参数是从 kafka 读出来的消息体 byte[],返回值必须是 json string。
  可以说是非常简单,整个数据流就这样跑通了(目前主要做了实时接入,对接 kafka)。
http://i2.运维网.com/images/blog/201805/18/964d572e53436e5a0aca13a9eca59e04.png?x-oss-process=image/watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

怎么实现 elasticsearch 的多租户权限管理?权限管理系统账号怎么与公司内部账号系统打通?
  调研了几个 es 权限管理插件,如: x-pack 、search-guard、elasticsearch-http-user-auth 等。x-pack 为商用,放弃;search-guard 实现略复杂,从可维护性和二次开发上看都不太满意。elasticsearch-http-user-auth 非常非常简洁,但是只支持 http 鉴权,没有 transport 鉴权。
  github 地址
  search-guard:https://github.com/floragunncom/search-guard
  elasticsearch-http-user-auth:https://github.com/elasticfence/elasticsearch-http-user-auth
  于是我们还是决定自己写一个:es-authority-manager-mi
  主要功能实现了:


[*]索引的读写权限管理(http+transport)
[*]账号体系与公司内部 kerberos 账号打通
[*]所有用户行为记录
  下面我简单分析一下鉴权插件的实现 (elasticsearch-5.6.2):
  首先看插件的主类
  

public>
  public AuthorityManagerMiPlugin(final Settings settings) {
  //插件初始化
  }
  

  //restful 接口注册, AuthorityManagerAction实现插件api接口
  @Override
  public List getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
  IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
  IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster) {
  final List handlers = new ArrayList(1);
  if (!AMMPluginIsDisabled) {
  handlers.add(new AuthorityManagerAction(settings, restController));
  }
  return handlers;
  }
  

  //transport拦截器注册
  @Override
  public List
页: [1]
查看完整版本: 小米Elasticsearch 服务化实践