设为首页 收藏本站
查看: 983|回复: 0

[经验分享] elasticsearch 2.3.5 源码简单分析

[复制链接]

尚未签到

发表于 2019-1-29 08:50:15 | 显示全部楼层 |阅读模式
  TransportClient, 它用来初始化与elasticsearch集群的链接,并调用 transportService.start(); 来启动服务器,与elasticsearch建立通讯。其中
modules.add(new ActionModule(true)); 用来加载请求操作对应的类
  ActionModule 定义每种操作对应的类,相当于入口
registerAction(SearchAction.INSTANCE, TransportSearchAction.class); 搜索请求对应这个类,
registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class);  删除请求对应这个类。
  重点看TransportSearchAction类。

switch(searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchService, clusterService,
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
break;
case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchService, clusterService,
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
break;
case DFS_QUERY_AND_FETCH:
searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchService, clusterService,
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
break;
case QUERY_AND_FETCH:
searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchService, clusterService,
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
break;
  根据查询方式,走不同的分支,

if (shardCount == 1) {
// if we only have one group, then we always want Q_A_F, no need for DFS, and no need to do THEN since we hit one shard
searchRequest.searchType(QUERY_AND_FETCH);
}
  当只有一个分片时,直接采用QUERY_AND_FETCH 方式,这种方式在多分片时,会返回 n*size 个结果,但在一个分片时是没问题的,而且速度快,后面会讲到。
  首先看一下QUERY_AND_FETCH 对应的操作流程
  SearchQueryAndFetchAsyncAction 是入口,我们看一下该类的start 方法,在其父类中

  for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
performFirstPhase(shardIndex, shardIt, shard);
} else {
// really, no shards active in this group
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
}
  根据涉及到分片,以此执行performFirstPhase,其的作用就是生成一个内部的分片搜索请求,这种请求只针对一个分片,最终调用以下代码

sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime()), new ActionListener() {
@Override
public void onResponse(FirstResult result) {
onFirstPhaseResult(shardIndex, shard, result, shardIt);
}
@Override
public void onFailure(Throwable t) {
onFirstPhaseResult(shardIndex, shard, node.id(), shardIt, t);
}
});
  onFirstPhaseResult主要作用是调用子类的moveToSecondPhase。这个方法在executeFetchPhase之后才执行的,因此在其后面再介绍。接着执行

protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener listener) {
searchService.sendExecuteFetch(node, request, listener);
}
  sendExecuteFetch定义在SearchServiceTransportAction中,它会将分片搜索请求转发到对应的节点上。
  看一样其源代码

public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener listener) {
transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request, new ActionListenerResponseHandler(listener) {
@Override
public QueryFetchSearchResult newInstance() {
return new QueryFetchSearchResult();
}
});
}
  注意另个handler

transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest.class, ThreadPool.Names.SEARCH, new SearchQueryFetchTransportHandler());`
class SearchQueryFetchTransportHandler extends TransportRequestHandler {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception {
QueryFetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
}
}
  接着看sendRequest

if (node.equals(localNode)) {
sendLocalRequest(requestId, action, request);
} else {
transport.sendRequest(node, requestId, action, request, options);
}
  sendLocalRequest  最终会调用SearchQueryFetchTransportHandler 的messageReceived 方法, 从而开始 执行 executeFetchPhase。
  看一下 executeFetchPhase的源代码:

loadOrExecuteQueryPhase(request, context, queryPhase); 里面是
if (canCache) {
indicesQueryCache.loadIntoContext(request, context, queryPhase);
} else {
queryPhase.execute(context);
}
  execute 里面是最终的查询操作,包括各种参数的解析,这里先不讲,
接着看executeFetchPhase 类, 发现里面还有
fetchPhase.execute(context);
最终返回
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
  从上面的代码可以看出,queryPhase和fetchPhase是连续执行的,这就是query_and_fetch的含义。
  返回的结果中包含query的结果和fetch的结果。对应的两个类分别是QuerySearchResult和FetchSearchResult。
QuerySearchResult 类

private long id;
private SearchShardTarget shardTarget;
private int from;
private int size;
private TopDocs topDocs;
private InternalAggregations aggregations;
private List pipelineAggregators;
private Suggest suggest;
private boolean searchTimedOut;
private Boolean terminatedEarly = null;
private List profileShardResults;
  再看看TopDocs的定义:

   /** The total number of hits for the query. */
public int totalHits;
/** The top hits for the query. */
public ScoreDoc[] scoreDocs;
/** Stores the maximum score value encountered, needed for normalizing. */
private float maxScore;

  里面包含了搜索命中的结果数量、文档编号、文档匹配分数、最大匹配分数。再看看ScoreDoc是如何定义的。


/** The score of this document for the query. */
public float score;
/** A hit document's number.
* @see IndexSearcher#doc(int) */
public int doc;
/** Only set by {@link TopDocs#merge} */
public int shardIndex;
  这里的doc就是内部的文档编号,可以通过IndexSearcher#doc(int)方法获取对应的文档内容。
  因此QuerySearchResult中只包含了内部的文档编号、文档的匹配分值。
  再看 FetchSearchResult

private long id;
private SearchShardTarget shardTarget;
private InternalSearchHits hits;
// client side counter
private transient int counter;
  InternalSearchHits的定义如下:

private InternalSearchHit[] hits;
public long totalHits;
private float maxScore;
  InternalSearchHit 源码如下
private transient int docId;


private float score = Float.NEGATIVE_INFINITY;
private Text id;
private Text type;
private InternalNestedIdentity nestedIdentity;
private long version = -1;
private BytesReference source;
private Map fields = ImmutableMap.of();
private Map highlightFields = null;
private Object[] sortValues = EMPTY_SORT_VALUES;
private String[] matchedQueries = Strings.EMPTY_ARRAY;
private Explanation explanation;
@Nullable
private SearchShardTarget shard;
private Map sourceAsMap;
private byte[] sourceAsBytes;
private Map innerHits;
  它包含了文档的原始内容和解析后的内容。
  moveToSecondPhase执行过程
在上面的executeFetchPhase执行完成之后,得到query结果和fetch结果之后,就执行moveToSecondPhase了,关键的两行

sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
firstResults, request);
  从代码中可以看出,搜索的第二阶段是在search线程池中提交一个任务,首先是对分片结果进行整体排序,然后将搜索结果进行合并。这里面分别调用了searchPhaseController.sortDocs和searchPhaseController.merge两个方法。
  排序先不说,merge 主要是合并 hits suggest addAggregation, 所以我们会看到每个search 请求会有 hit 区域  聚合区域
所以 如果在搜索的时候指定search_type为query_and_fetch,再指定size为10,那么就会返回50个结果。,因为每个分片都返回了10个文档,如果有5个分片的话,那么最后合并的结果就是50个。
  query_then_fetch执行有些类似
其 executeQueryPhase,如下

if (canCache) {
indicesQueryCache.loadIntoContext(request, context, queryPhase);
} else {
queryPhase.execute(context);
}
  只有query没有fetch。这是很重要的区别,它只包含文档编号和必要的排序分值
当queryPhase结束之后,就开始第二阶段了。第二阶段从moveToSecondPhase开始。它的代码定义在TransportSearchQueryThenFetchAction中。主要的作用是将第一阶段获取的文档编号进行排序。排序完成之后再根据文档编号获取文档里面实际的内容。相关的代码如下:


sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
for (AtomicArray.Entry entry : docIdsToLoad.asList()) {
QuerySearchResultProvider queryResult = firstResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
  sortDocs前面已经讲过了,作用是将文档编号根据每个文档的匹配分值进行排序。
executeFetch相关的代码如下。它的作用是调用searchService开启一个异步任务,根据文档编号获取文档的具体内容,并将结果存放到fetchResults中。根据counter判断如果所有的fetch任务都执行完了,就调用finishHim来完成本次查询结果。
  finishHim相关的代码如下。它的作用是合并每个分片的查询结果,让后将合并结果通知给listener。让它完成最后的查询结果。searchPhaseController.merge在前面讲过了。
剩下的一些内容下次再聊,参考内容
https://blog.csdn.net/caipeichao2/article/details/46418413




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-668963-1-1.html 上篇帖子: elasticsearch搜索项目重构 下篇帖子: 安装elasticsearch 、redis、zookeeper 、kafka
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表