设为首页 收藏本站
查看: 1279|回复: 0

[经验分享] ElasticSearch笔记整理(三):Java API使用与ES中文分词

[复制链接]

尚未签到

发表于 2019-1-29 10:11:40 | 显示全部楼层 |阅读模式
  [TOC]

pom.xml
  使用maven工程构建ES Java API的测试项目,其用到的依赖如下:


org.elasticsearch
elasticsearch
2.3.0


com.fasterxml.jackson.core
jackson-databind
2.7.0


org.dom4j
dom4j
2.0.0



org.projectlombok
lombok
1.16.10

ES API之基本增删改查
  使用junit进行测试,其使用的全局变量与setUp函数如下:

private TransportClient client;
private String index = "bigdata";   // 要操作的索引库为"bigdata"
private String type = "product";    // 要操作的类型为"product"
@Before
public void setup() throws UnknownHostException {
// 连接的是ES集群,所以需要添加集群名称,否则无法创建客户端
Settings settings = Settings.builder().put("cluster.name", "bigdata-08-28").build();
client = TransportClient.builder().settings(settings).build();
TransportAddress ta1 = new InetSocketTransportAddress(InetAddress.getByName("uplooking01"), 9300);
TransportAddress ta2 = new InetSocketTransportAddress(InetAddress.getByName("uplooking02"), 9300);
TransportAddress ta3 = new InetSocketTransportAddress(InetAddress.getByName("uplooking03"), 9300);
client.addTransportAddresses(ta1, ta2, ta3);
/*settings = client.settings();
Map asMap = settings.getAsMap();
for(Map.Entry setting : asMap.entrySet()) {
System.out.println(setting.getKey() + "::" + setting.getValue());
}*/
}
索引添加:JSON方式

/**
* 注意:往es中添加数据有4种方式
* 1.JSON
* 2.Map
* 3.Java Bean
* 4.XContentBuilder
*
* 1.JSON方式
*/
@Test
public void testAddJSON() {
String source = "{\"name\":\"sqoop\", \"author\": \"apache\", \"version\": \"1.4.6\"}";
IndexResponse response = client.prepareIndex(index, type, "4").setSource(source).get();
System.out.println(response.isCreated());
}
索引添加:Map方式

/**
* 添加数据:
* 2.Map方式
*/
@Test
public void testAddMap() {
Map source = new HashMap();
source.put("name", "flume");
source.put("author", "Cloudera");
source.put("version", "1.8.0");
IndexResponse response = client.prepareIndex(index, type, "5").setSource(source).get();
System.out.println(response.isCreated());
}
索引添加:Java Bean方式

/**
* 添加数据:
* 3.Java Bean方式
*
* 如果不将对象转换为json字符串,则会报下面的异常:
* The number of object passed must be even but was [1]
*/
@Test
public void testAddObj() throws JsonProcessingException {
Product product = new Product("kafka", "linkedIn", "0.10.0.1", "kafka.apache.org");
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(product);
System.out.println(json);
IndexResponse response = client.prepareIndex(index, type, "6").setSource(json).get();
System.out.println(response.isCreated());
}
索引添加:XContentBuilder方式

/**
* 添加数据:
* 4.XContentBuilder方式
*/
@Test
public void testAddXContentBuilder() throws IOException {
XContentBuilder source = XContentFactory.jsonBuilder();
source.startObject()
.field("name", "redis")
.field("author", "redis")
.field("version", "3.2.0")
.field("url", "redis.cn")
.endObject();
IndexResponse response = client.prepareIndex(index, type, "7").setSource(source).get();
System.out.println(response.isCreated());
}
索引查询

/**
* 查询具体的索引信息
*/
@Test
public void testGet() {
GetResponse response = client.prepareGet(index, type, "6").get();
Map map = response.getSource();
/*for(Map.Entry me : map.entrySet()) {
System.out.println(me.getKey() + "=" + me.getValue());
}*/
// lambda表达式,jdk 1.8之后
map.forEach((k, v) -> System.out.println(k + "=" + v));
//        map.keySet().forEach(key -> System.out.println(key + "xxx"));
}
索引更新

/**
* 局部更新操作与curl的操作是一致的
* curl -XPOST http://uplooking01:9200/bigdata/product/AWA184kojrSrzszxL-Zs/_update -d' {"doc":{"name":"sqoop", "author":"apache"}}'
*
* 做全局更新的时候,也不用prepareUpdate,而直接使用prepareIndex
*/
@Test
public void testUpdate() throws Exception {
/*String source = "{\"doc\":{\"url\": \"http://flume.apache.org\"}}";
UpdateResponse response = client.prepareUpdate(index, type, "4").setSource(source.getBytes()).get();*/
// 使用下面这种方式也是可以的
String source = "{\"url\": \"http://flume.apache.org\"}";
UpdateResponse response = client.prepareUpdate(index, type, "4").setDoc(source.getBytes()).get();
System.out.println(response.getVersion());
}
索引删除

/**
* 删除操作
*/
@Test
public void testDelete() {
DeleteResponse response = client.prepareDelete(index, type, "5").get();
System.out.println(response.getVersion());
}
批量操作

/**
* 批量操作
*/
@Test
public void testBulk() {
IndexRequestBuilder indexRequestBuilder = client.prepareIndex(index, type, "8")
.setSource("{\"name\":\"elasticsearch\", \"url\":\"http://www.elastic.co\"}");
UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate(index, type, "1").setDoc("{\"url\":\"http://hadoop.apache.org\"}");
BulkRequestBuilder bulk = client.prepareBulk();
BulkResponse bulkResponse = bulk.add(indexRequestBuilder).add(updateRequestBuilder).get();
Iterator it = bulkResponse.iterator();
while(it.hasNext()) {
BulkItemResponse response = it.next();
System.out.println(response.getId() + "" + response.getVersion());
}
}
获取索引记录数

/**
* 获取索引记录数
*/
@Test
public void testCount() {
CountResponse response = client.prepareCount(index).get();
System.out.println("索引记录数:" + response.getCount());
}
ES API之高级查询
  基于junit进行测试,其用到的setUp函数和showResult函数如下:
  全局变量与setUp:

private TransportClient client;
private String index = "bigdata";
private String type = "product";
private String[] indics = {"bigdata", "bank"};
@Before
public void setUp() throws UnknownHostException {
Settings settings = Settings.builder().put("cluster.name", "bigdata-08-28").build();
client = TransportClient.builder().settings(settings).build();
TransportAddress ta1 = new InetSocketTransportAddress(InetAddress.getByName("uplooking01"), 9300);
TransportAddress ta2 = new InetSocketTransportAddress(InetAddress.getByName("uplooking02"), 9300);
TransportAddress ta3 = new InetSocketTransportAddress(InetAddress.getByName("uplooking03"), 9300);
client.addTransportAddresses(ta1, ta2, ta3);
}
  showResult:

/**
* 格式化输出查询结果
* @param response
*/
private void showResult(SearchResponse response) {
SearchHits searchHits = response.getHits();
float maxScore = searchHits.getMaxScore();  // 查询结果中的最大文档得分
System.out.println("maxScore: " + maxScore);
long totalHits = searchHits.getTotalHits(); // 查询结果记录条数
System.out.println("totalHits: " + totalHits);
SearchHit[] hits = searchHits.getHits();    // 查询结果
System.out.println("当前返回结果记录条数:" + hits.length);
for (SearchHit hit : hits) {
long version = hit.version();
String id = hit.getId();
String index = hit.getIndex();
String type = hit.getType();
float score = hit.getScore();
System.out.println("===================================================");
String source = hit.getSourceAsString();
System.out.println("version: " + version);
System.out.println("id: " + id);
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("score: " + score);
System.out.println("source: " + source);
}
}
ES查询类型说明
  查询类型有如下4种:

query and fetch(速度最快)(返回N倍数据量)
query then fetch(默认的搜索方式)
DFS query and fetch
DFS query then fetch(可以更精确控制搜索打分和排名。)
  查看API的注释如下:

/**
* Same as {@link #QUERY_THEN_FETCH}, except for an initial scatter phase which goes and computes the distributed
* term frequencies for more accurate scoring.
*/
DFS_QUERY_THEN_FETCH((byte) 0),
/**
* The query is executed against all shards, but only enough information is returned (not the document content).
* The results are then sorted and ranked, and based on it, only the relevant shards are asked for the actual
* document content. The return number of hits is exactly as specified in size, since they are the only ones that
* are fetched. This is very handy when the index has a lot of shards (not replicas, shard id groups).
*/
QUERY_THEN_FETCH((byte) 1),
/**
* Same as {@link #QUERY_AND_FETCH}, except for an initial scatter phase which goes and computes the distributed
* term frequencies for more accurate scoring.
*/
DFS_QUERY_AND_FETCH((byte) 2),
/**
* The most naive (and possibly fastest) implementation is to simply execute the query on all relevant shards
* and return the results. Each shard returns size results. Since each shard already returns size hits, this
* type actually returns size times number of shards results back to the caller.
*/
QUERY_AND_FETCH((byte) 3),
  关于DFS的说明:

DFS是什么缩写?
这个D可能是Distributed,F可能是frequency的缩写,至于S可能是Scatter的缩写,整个单词可能是分布式词频率和
文档频率散发的缩写。
初始化散发是一个什么样的过程?
从es的官方网站我们可以发现,初始化散发其实就是在进行真正的查询之前,先把各个分片的词频率和文档频率收集一
下,然后进行词搜索的时候,各分片依据全局的词频率和文档频率进行搜索和排名。显然如果使用
DFS_QUERY_THEN_FETCH这种查询方式,效率是最低的,因为一个搜索,可能要请求3次分片。但,使用DFS方法,搜索
精度应该是最高的。
  总结:

总结一下,从性能考虑QUERY_AND_FETCH是最快的,DFS_QUERY_THEN_FETCH是最慢的。从搜索的准确度来说,DFS要
比非DFS的准确度更高。
精确查询

/**
* 1.精确查询
* termQuery
* term就是一个字段
*/
@Test
public void testSearch1() {
SearchRequestBuilder searchQuery = client.prepareSearch(indics)    // 在prepareSearch()的参数为索引库列表,意为要从哪些索引库中进行查询
.setSearchType(SearchType.DEFAULT)  // 设置查询类型,有QUERY_AND_FETCH  QUERY_THEN_FETCH  DFS_QUERY_AND_FETCH  DFS_QUERY_THEN_FETCH
.setQuery(QueryBuilders.termQuery("author", "apache"))// 设置相应的query,用于检索,termQuery的参数说明:name是doc中的具体的field,value就是要找的具体的值
;
// 如果上面不加查询条件,则会查询所有
SearchResponse response = searchQuery.get();
showResult(response);
}
模糊查询

/**
* 2.模糊查询
* prefixQuery
*/
@Test
public void testSearch2() {
SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.QUERY_THEN_FETCH)
.setQuery(QueryBuilders.prefixQuery("name", "h"))
.get();
showResult(response);
}
分页查询

/**
* 3.分页查询
* 查询索引库bank中
* 年龄在(25, 35]之间的数据信息
*
* 分页算法:
*      查询的第几页,每一页显示几条
*          每页显示10条记录
*
*      查询第4页的内容
*          setFrom(30=(4-1)*size)
*          setSize(10)
*       所以第N页的起始位置:(N - 1) * pageSize
*/
@Test
public void testSearch3() {
// 注意QUERY_THEN_FETCH和注意QUERY_AND_FETCH返回的记录数不一样,前者默认10条,后者是50条(5个分片)
SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.rangeQuery("age").gt(25).lte(35))
// 下面setFrom和setSize用于设置查询结果进行分页
.setFrom(0)
.setSize(5)
.get();
showResult(response);
}
高亮显示查询

/**
* 4.高亮显示查询
* 获取数据,
*  查询apache,不仅在author拥有,也可以在url,在name中也可能拥有
*  author or url   --->booleanQuery中的should操作
*      如果是and的类型--->booleanQuery中的must操作
*      如果是not的类型--->booleanQuery中的mustNot操作
*  使用的match操作,其实就是使用要查询的keyword和对应字段进行完整匹配,是否相等,相等返回
*/
@Test
public void testSearch4() {
SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.DEFAULT)
//                .setQuery(QueryBuilders.multiMatchQuery("apache", "author", "url"))
//                .setQuery(QueryBuilders.regexpQuery("url", ".*apache.*"))
//                .setQuery(QueryBuilders.termQuery("author", "apache"))
.setQuery(QueryBuilders.boolQuery()
.should(QueryBuilders.regexpQuery("url", ".*apache.*"))
.should(QueryBuilders.termQuery("author", "apache")))
// 设置高亮显示--->设置相应的前置标签和后置标签
.setHighlighterPreTags("")
.setHighlighterPostTags("")
// 哪个字段要求高亮显示
.addHighlightedField("author")
.addHighlightedField("url")
.get();
SearchHits searchHits = response.getHits();
float maxScore = searchHits.getMaxScore();  // 查询结果中的最大文档得分
System.out.println("maxScore: " + maxScore);
long totalHits = searchHits.getTotalHits(); // 查询结果记录条数
System.out.println("totalHits: " + totalHits);
SearchHit[] hits = searchHits.getHits();    // 查询结果
System.out.println("当前返回结果记录条数:" + hits.length);
for(SearchHit hit : hits) {
System.out.println("========================================================");
Map highlightFields = hit.getHighlightFields();
for(Map.Entry me : highlightFields.entrySet()) {
System.out.println("--------------------------------------");
String key = me.getKey();
HighlightField highlightField = me.getValue();
String name = highlightField.getName();
System.out.println("key: " + key + ", name: " + name);
Text[] texts = highlightField.fragments();
String value = "";
for(Text text : texts) {
// System.out.println("text: " + text.toString());
value += text.toString();
}
System.out.println("value: " + value);
}
}
}
排序查询

/**
* 5.排序查询
* 对结果集进行排序
*  balance(收入)由高到低
*/
@Test
public void testSearch5() {
// 注意QUERY_THEN_FETCH和注意QUERY_AND_FETCH返回的记录数不一样,前者默认10条,后者是50条(5个分片)
SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.rangeQuery("age").gt(25).lte(35))
.addSort("balance", SortOrder.DESC)
// 下面setFrom和setSize用于设置查询结果进行分页
.setFrom(0)
.setSize(5)
.get();
showResult(response);
}
聚合查询:计算平均值

/**
* 6.聚合查询:计算平均值
*/
@Test
public void testSearch6() {
indics = new String[]{"bank"};
// 注意QUERY_THEN_FETCH和注意QUERY_AND_FETCH返回的记录数不一样,前者默认10条,后者是50条(5个分片)
SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.rangeQuery("age").gt(25).lte(35))
/*
select avg(age) as avg_name from person;
那么这里的avg("balance")--->就是返回结果avg_name这个别名
*/
.addAggregation(AggregationBuilders.avg("avg_balance").field("balance"))
.addAggregation(AggregationBuilders.max("max").field("balance"))
.get();
//        System.out.println(response);
/*
response中包含的Aggregations
"aggregations" : {
"max" : {
"value" : 49741.0
},
"avg_balance" : {
"value" : 25142.137373737372
}
}
则一个aggregation为:
{
"value" : 49741.0
}
*/
Aggregations aggregations = response.getAggregations();
List aggregationList = aggregations.asList();
for(Aggregation aggregation : aggregationList) {
System.out.println("========================================");
String name = aggregation.getName();
// Map map = aggregation.getMetaData();
System.out.println("name: " + name);
// System.out.println(map);
Object obj = aggregation.getProperty("value");
System.out.println(obj);
}
/*Aggregation avgBalance = aggregations.get("avg_balance");
Object obj = avgBalance.getProperty("value");
System.out.println(obj);*/
}
ES中文分词之集成IK分词
  如果我们的数据包含中文,而在查询时希望可以支持对中文进行分词搜索,那么ES本身依赖于Lucene的分词对中文就不佳了,这时就可以考虑使用其它分词方法,如这里要说明的IK中文分词,其集成到ES的步骤如下:

  1)下载地址:
https://github.com/medcl/elasticsearch-analysis-ik
2)使用maven对源代码进行编译(mvn clean install -DskipTests)(package)
3)把编译后的target/releases下的zip文件拷贝到   ES_HOME/plugins/analysis-ik目录下面,然后解压
4)把下载的ik插件中的conf/ik目录拷贝到ES_HOME/config下
5)修改ES_HOME/config/elasticsearch.yml文件,添加index.analysis.analyzer.default.type: ik
(把IK设置为默认分词器,这一步是可选的)
6)重启es服务
7)测试分词效果
  需要说明的是,数据需要重新插入,并使用ik分词,即需要重新构建期望使用中文分词IK的索引库。

  测试代码如下:

package cn.xpleaf.bigdata.elasticsearch;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
/**
* 使用Java API来操作es集群
* Transport
* 代表了一个集群
* 我们客户端和集群通信是使用TransportClient
*
* 使用prepareSearch来完成全文检索之
*  中文分词
*/
public class ElasticSearchTest3 {
private TransportClient client;
private String index = "bigdata";
private String type = "product";
private String[] indics = {"chinese"};
@Before
public void setUp() throws UnknownHostException {
Settings settings = Settings.builder().put("cluster.name", "bigdata-08-28").build();
client = TransportClient.builder().settings(settings).build();
TransportAddress ta1 = new InetSocketTransportAddress(InetAddress.getByName("uplooking01"), 9300);
TransportAddress ta2 = new InetSocketTransportAddress(InetAddress.getByName("uplooking02"), 9300);
TransportAddress ta3 = new InetSocketTransportAddress(InetAddress.getByName("uplooking03"), 9300);
client.addTransportAddresses(ta1, ta2, ta3);
}
/**
* 中文分词的操作
* 1.查询以"中"开头的数据,有两条
* 2.查询以“中国”开头的数据,有0条
* 3.查询包含“烂”的数据,有1条
* 4.查询包含“烂摊子”的数据,有0条
* 分词:
*      为什么我们搜索China is the greatest country~
*                 中文:中国最牛逼
*
*                 ×××
*                      中华
*                      人民
*                      共和国
*                      中华人民
*                      人民共和国
*                      华人
*                      共和
*      特殊的中文分词法:
*          庖丁解牛
*          IK分词法
*          搜狗分词法
*/
@Test
public void testSearch1() {
SearchResponse response = client.prepareSearch(indics)    // 在prepareSearch()的参数为索引库列表,意为要从哪些索引库中进行查询
.setSearchType(SearchType.DEFAULT)  // 设置查询类型,有QUERY_AND_FETCH  QUERY_THEN_FETCH  DFS_QUERY_AND_FETCH  DFS_QUERY_THEN_FETCH
//.setQuery(QueryBuilders.prefixQuery("content", "烂摊子"))// 设置相应的query,用于检索,termQuery的参数说明:name是doc中的具体的field,value就是要找的具体的值
//                .setQuery(QueryBuilders.regexpQuery("content", ".*烂摊子.*"))
.setQuery(QueryBuilders.prefixQuery("content", "中国"))
.get();
showResult(response);
}
/**
* 格式化输出查询结果
* @param response
*/
private void showResult(SearchResponse response) {
SearchHits searchHits = response.getHits();
float maxScore = searchHits.getMaxScore();  // 查询结果中的最大文档得分
System.out.println("maxScore: " + maxScore);
long totalHits = searchHits.getTotalHits(); // 查询结果记录条数
System.out.println("totalHits: " + totalHits);
SearchHit[] hits = searchHits.getHits();    // 查询结果
System.out.println("当前返回结果记录条数:" + hits.length);
for (SearchHit hit : hits) {
long version = hit.version();
String id = hit.getId();
String index = hit.getIndex();
String type = hit.getType();
float score = hit.getScore();
System.out.println("===================================================");
String source = hit.getSourceAsString();
System.out.println("version: " + version);
System.out.println("id: " + id);
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("score: " + score);
System.out.println("source: " + source);
}
}
@After
public void cleanUp() {
client.close();
}
}
  相关测试代码已上传到GitHub:https://github.com/xpleaf/elasticsearch-study





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669047-1-1.html 上篇帖子: 用elasticsearch和nuxtjs搭建bt搜索引擎 下篇帖子: Elasticsearch1.7.3升级到2.4.2记录
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表