gteric 发表于 2015-11-30 08:37:54

Python Elasticsearch api

描述:ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。下面介绍了利用Python API接口进行数据查询,方便其他系统的调用。





安装API




pip install elasticsearch



建立es连接




from elasticsearch import Elasticsearch
es = Elasticsearch([{'host':'10.10.13.12','port':9200}])

  


数据检索功能




es.search(index='logstash-2015.08.20', q='http_status_code:5* AND server_name:"web1"', from_='124119')

  




常用参数



[*]index - 索引名
[*]q - 查询指定匹配 使用Lucene查询语法
[*]from_ - 查询起始点默认0
[*]doc_type - 文档类型
[*]size - 指定查询条数 默认10
[*]field - 指定字段 逗号分隔
[*]sort - 排序字段:asc/desc
[*]body - 使用Query DSL
[*]scroll - 滚动查询




统计查询功能

# 语法同search大致一样,但只输出统计值




In: es.count(index='logstash-2015.08.21', q='http_status_code:500')
Out:{u'_shards':{u'failed':0, u'successful':5, u'total':5}, u'count':17042}

  






知识扩展



[*]滚动demo



# Initialize the scroll
page = es.search(
index ='yourIndex',
doc_type ='yourType',
scroll ='2m',
search_type ='scan',
size =1000,
body ={
# Your query's body
})
sid = page['_scroll_id']
scroll_size = page['hits']['total']
# Start scrolling
while(scroll_size >0):
print "Scrolling..."
page = es.scroll(scroll_id = sid, scroll ='2m')
# Update the scroll ID
sid = page['_scroll_id']
# Get the number of results that we returned in the last scroll
scroll_size = len(page['hits']['hits'])
print "scroll size: "+ str(scroll_size)
# Do something with the obtained page





以上demo实现了一次取若干数据,数据取完之后结束,不会获取到最新更新的数据。我们滚动完之后想获取最新数据怎么办?滚动的时候会有一个统计值,如total: 5。跳出循环之后,我们可以用_from参数定位到5开始滚动之后的数据。





[*]Query DSL




range过滤器查询范围



gt: > 大于

lt: < 小于

gte: >= 大于或等于

lte: <= 小于或等于




"range":{
"money":{
"gt":20,
"lt":40
}
}      



[*]  



bool组合过滤器



must:所有分句都必须匹配,与 AND 相同。

must_not:所有分句都必须不匹配,与 NOT 相同。

should:至少有一个分句匹配,与 OR 相同。




{
"bool":{
  "must":[],
  "should":[],
  "must_not":[],
}
}








term过滤器



[*]term单过滤



{
"terms":{
  "money":20
}
}

  




[*]terms复数版本,允许多个匹配条件



{
"terms":{
  "money":
}
}







正则查询




{
"regexp": {
"http_status_code": "5.*"
}
}





match查询



[*]match 精确匹配



{
"match":{
  "email":"123456@qq.com"
}
}







[*]multi_match 多字段搜索



{
"multi_match":{
  "query":"11",
  "fields":["Tr","Tq"]
}
}





demo



[*]获取最近一小时的数据



{'query':
{'filtered':
{'filter':
{'range':
{'@timestamp':{'gt':'now-1h'}}
}
}
}
}



[*]  





[*]条件过滤查询



{
"query":{
"filtered":{
"query":{"match":{"http_status_code":500}},
"filter":{"term":{"server_name":"vip03"}}
}
}
}





[*]Terms Facet 单字段统计



{'facets':
{'stat':
{'terms':
{'field':'http_status_code',
'order':'count',
'size':50}
}
},
'size':0
}      

  




[*]一次统计多个字段



{'facets':
{'cip':
{'terms':
{'fields':['client_ip']}},
'status_facets':{'terms':{'fields':['http_status_code'],
'order':'term',
'size':50}}},
'query':{'query_string':{'query':'*'}},
'size':0
}

  




[*]多个字段一起统计



{'facets':
{'tag':
{'terms':
{'fields':['http_status_code','client_ip'],
'size':10
}
}
},
'query':
{'match_all':{}},
'size':0
}

  




数据组装



以下是kibana首页的demo,用来统计一段时间内的日志数量




{
"facets": {
"0": {
"date_histogram": {
"field": "@timestamp",
"interval": "5m"
},
"facet_filter": {
"fquery": {
"query": {
"filtered": {
"query": {
"query_string": {
"query": "*"
}
},
"filter": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
'gt': 'now-1h'
}
}
},
{
"exists": {
"field": "http_status_code.raw"
}
},
# --------------- -------
# 此处加匹配条件
]
}
}
}
}
}
}
}
},
"size": 0
}

  




如果想添加匹配条件,在以上代码标识部分加上过滤条件,按照以下代码格式即可




{
"query": {
"query_string": {"query": "backend_name:baidu.com"}
}
},

  




先介绍到这里,后续会有Query DSL API介绍。  
页: [1]
查看完整版本: Python Elasticsearch api