设为首页 收藏本站
查看: 1216|回复: 0

[经验分享] solr源码解读(转)

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-7-17 12:27:51 | 显示全部楼层 |阅读模式
  solr源码解读(转)原文地址:http://blog.iyunv.com/duck_genuine/article/details/6962624
配置
solr 对一个搜索请求的的流程
  在solrconfig.xml会配置一个handler。配置了前置处理组件preParams,还有后置处理组件filterResult,当然还有默认的组件
  



[html] view plaincopy

  •   

  •       
  •         preParams  
  •       
  •          
  •           explicit  
  •           10  
  •           0  
  •          *:*  
  •               

  •       
  •         filterResult  
  •             

  •      
  

  
http请求控制器
  当一个查询请求过来的时候,先到类SolrDispatchFilter,由这个分发器寻找对应的handler来处理。




[java] view plaincopy

  • String qt = solrReq.getParams().get( CommonParams.QT );
  • handler = core.getRequestHandler( qt );
  

  
  ---------------------------------------------------------------------------------------------------
  



[java] view plaincopy

  • this.execute( req, handler, solrReq, solrRsp );  
  • HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);
  

  
  -----------------------------------------------------------------------------------------------
  从上面的代码里看出是由solrCore留下的接口来处理请求。从代码框架上,从此刻开始进入solr的核心代码。
  
  



[java] view plaincopy

  • protected void execute( HttpServletRequest req, SolrRequestHandler handler, SolrQueryRequest sreq, SolrQueryResponse rsp) {  
  •   sreq.getContext().put( "webapp", req.getContextPath() );  
  •   sreq.getCore().execute( handler, sreq, rsp );
  • }
  

  
  
  看一下solrCore代码execute的方法 的主要代码
  



[java] view plaincopy

  • public void execute(SolrRequestHandler handler, SolrQueryRequest req, SolrQueryResponse rsp) {  
  • 。。。。。
  •     handler.handleRequest(req,rsp);
  •     setResponseHeaderValues(handler,req,rsp);
  • 。。。。。。。
  •   }
  

  
  主要实现对请求的处理,并将请求结果的状态信息写到响应的头部
  
SolrRequestHandler 处理器
  
  再看一下对请求的处理。。先看定义该请求处理器的接口,可以更好理解。只有两个方法,一个是初始化信息,主要是配置时的默认参数,另一个就是处理请求的接口。
  



[java] view plaincopy

  • public interface SolrRequestHandler extends SolrInfoMBean {  
  •   public void init(NamedList args);  
  •   public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp);  
  • }
  

  
  先看一下实现该接口的类RequestHandlerBase
  



[java] view plaincopy

  • public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) {  
  •     numRequests++;
  •     try {  
  •       SolrPluginUtils.setDefaults(req,defaults,appends,invariants);
  •       rsp.setHttpCaching(httpCaching);
  •       handleRequestBody( req, rsp );
  •       // count timeouts  
  •       NamedList header = rsp.getResponseHeader();
  •       if(header != null) {  
  •         Object partialResults = header.get("partialResults");  
  •         boolean timedOut = partialResults == null ? false : (Boolean)partialResults;  
  •         if( timedOut ) {  
  •           numTimeouts++;
  •           rsp.setHttpCaching(false);  
  •         }
  •       }
  •     } catch (Exception e) {  
  •       SolrException.log(SolrCore.log,e);
  •       if (e instanceof ParseException) {  
  •         e = new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);  
  •       }
  •       rsp.setException(e);
  •       numErrors++;
  •     }
  •     totalTime += rsp.getEndTime() - req.getStartTime();
  •   }
  

  
  主要记录该请求处理的状态与处理时间记录。真正的实现方法交由各个子类      handleRequestBody( req, rsp );
  现在看一下SearchHandler对于搜索处理的实现方法
  
  首先是将solrconfig.xml上配置的各个处理组件按一定顺序组装起来,先是first-Component,默认的component,last-component.这些处理组件会按照它们的顺序来执行,以下是searchHandler的实现主体。方法handleRequestBody
  
  



[java] view plaincopy

  • @Override  
  • public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception, ParseException, InstantiationException, IllegalAccessException  
  • {
  •   // int sleep = req.getParams().getInt("sleep",0);  
  •   // if (sleep > 0) {log.error("SLEEPING for " + sleep);  Thread.sleep(sleep);}  
  •   ResponseBuilder rb = new ResponseBuilder();  
  •   rb.req = req;
  •   rb.rsp = rsp;
  •   rb.components = components;
  •   rb.setDebug(req.getParams().getBool(CommonParams.DEBUG_QUERY, false));  

  •   final RTimer timer = rb.isDebug() ? new RTimer() : null;  

  •   if (timer == null) {  
  •     // non-debugging prepare phase  
  •     for( SearchComponent c : components ) {  
  •       c.prepare(rb);
  •     }
  •   } else {  
  •     // debugging prepare phase  
  •     RTimer subt = timer.sub( "prepare" );  
  •     for( SearchComponent c : components ) {  
  •       rb.setTimer( subt.sub( c.getName() ) );
  •       c.prepare(rb);
  •       rb.getTimer().stop();
  •     }
  •     subt.stop();  
  •   }
  •    //单机版  
  •   if (rb.shards == null) {  
  •     // a normal non-distributed request  

  •     // The semantics of debugging vs not debugging are different enough that  
  •     // it makes sense to have two control loops  
  •     if(!rb.isDebug()) {  
  •       // Process  
  •       for( SearchComponent c : components ) {  
  •         c.process(rb);
  •       }
  •     }
  •     else {  
  •       // Process  
  •       RTimer subt = timer.sub( "process" );  
  •       for( SearchComponent c : components ) {  
  •         rb.setTimer( subt.sub( c.getName() ) );
  •         c.process(rb);
  •         rb.getTimer().stop();
  •       }
  •       subt.stop();
  •       timer.stop();

  •       // add the timing info  
  •       if( rb.getDebugInfo() == null ) {  
  •         rb.setDebugInfo( new SimpleOrderedMap() );  
  •       }
  •       rb.getDebugInfo().add( "timing", timer.asNamedList() );  
  •     }

  •   } else {//分布式请求  
  •     // a distributed request  

  •     HttpCommComponent comm = new HttpCommComponent();  

  •     if (rb.outgoing == null) {  
  •       rb.outgoing = new LinkedList();  
  •     }
  •     rb.finished = new ArrayList();  

  •     //起始状态为0,结束状态为整数的最大值  
  •     int nextStage = 0;  
  •     do {  
  •       rb.stage = nextStage;
  •       nextStage = ResponseBuilder.STAGE_DONE;

  •       // call all components  
  •       for( SearchComponent c : components ) {  
  •         //得到所有组件运行后返回的下一个状态,并取最小值  
  •         nextStage = Math.min(nextStage, c.distributedProcess(rb));
  •       }


  •       // 如果有需要向子机发送请求  
  •       while (rb.outgoing.size() > 0) {  

  •         // submit all current request tasks at once  
  •         while (rb.outgoing.size() > 0) {  
  •           ShardRequest sreq = rb.outgoing.remove(0);  
  •           sreq.actualShards = sreq.shards;
  •           if (sreq.actualShards==ShardRequest.ALL_SHARDS) {  
  •             sreq.actualShards = rb.shards;
  •           }
  •           sreq.responses = new ArrayList();  

  •           // 向各个子机发送请求  
  •           for (String shard : sreq.actualShards) {  
  •             ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);  
  •             params.remove(ShardParams.SHARDS);      // not a top-level request  
  •             params.remove("indent");  
  •             params.remove(CommonParams.HEADER_ECHO_PARAMS);
  •             params.set(ShardParams.IS_SHARD, true);  // a sub (shard) request  
  •             String shardHandler = req.getParams().get(ShardParams.SHARDS_QT);
  •             if (shardHandler == null) {  
  •               params.remove(CommonParams.QT);
  •             } else {  
  •               params.set(CommonParams.QT, shardHandler);
  •             }
  •           //提交子请求  
  •            comm.submit(sreq, shard, params);
  •           }
  •         }


  •         // now wait for replies, but if anyone puts more requests on  
  •         // the outgoing queue, send them out immediately (by exiting  
  •         // this loop)  
  •         while (rb.outgoing.size() == 0) {  
  •           ShardResponse srsp = comm.takeCompletedOrError();
  •           if (srsp == null) break;  // no more requests to wait for  

  •           // Was there an exception?  If so, abort everything and  
  •           // rethrow  
  •           if (srsp.getException() != null) {  
  •             comm.cancelAll();
  •             if (srsp.getException() instanceof SolrException) {  
  •               throw (SolrException)srsp.getException();  
  •             } else {  
  •               throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException());  
  •             }
  •           }

  •           rb.finished.add(srsp.getShardRequest());

  •           //每个组件都对于返回的数据处理  
  •           for(SearchComponent c : components) {  
  •             c.handleResponses(rb, srsp.getShardRequest());
  •           }
  •         }
  •       }//请求队列结束  

  •       //再对该轮请求进行收尾工作  
  •       for(SearchComponent c : components) {  
  •           c.finishStage(rb);
  •        }

  •       //如果状态未到结束,则继续循环  
  •     } while (nextStage != Integer.MAX_VALUE);  
  •   }
  • }
  
  
  首先运行的是各个组件的方法prepare
  



[java] view plaincopy

  • for( SearchComponent c : components ) {  
  •   c.prepare(rb);
  • }
  

  
  再则如果不是分布式搜索,则比较简单的运行
  



[java] view plaincopy

  • for( SearchComponent c : components ) {  
  •         c.process(rb);
  •       }
  

  
  就结束!
  如果是分布式搜索,过程会比较复杂些,对于每个组件处理都会返回一个状态,对于以下几个方法循环执行,直到状态结束 。  
  在类ResponseBuilder定义了几个状态。
  



[java] view plaincopy

  • public static int STAGE_START           = 0;  
  • public static int STAGE_PARSE_QUERY     = 1000;  
  • public static int STAGE_EXECUTE_QUERY   = 2000;  
  • public static int STAGE_GET_FIELDS      = 3000;  
  • public static int STAGE_DONE            = Integer.MAX_VALUE;  
  
  
  
  从STAGE_START---->STAGE_PARSE_QUERY------>STAGE_EXECUTE_QUERY--------------->STAGE_GET_FIELDS------------>STAGE_DONE
  从这些状态名称可以猜得出整个对应的过程。
  每个组件先调用方法distributeProcess,并返回下一个状态
  



[java] view plaincopy

  • for( SearchComponent c : components ) {  
  •      // the next stage is the minimum of what all components report  
  •      nextStage = Math.min(nextStage, c.distributedProcess(rb));
  •    }
  
  而方法handleResponse主要处理返回来的数据
  
  



[java] view plaincopy

  • for(SearchComponent c : components) {  
  •         c.handleResponses(rb, srsp.getShardRequest());
  •       }
  
然后交由finishStage方法来对每一个状态的过程作结束动作。
  
  ------------------------------
  



[java] view plaincopy

  • for(SearchComponent c : components) {  
  •           c.finishStage(rb);
  •        }
  

  
  -----------------------------
  了解这个流程有助于扩展solr。比如有个业务是要我对搜索的自然结果排序进行干预,而这个干预只针对前几页结果,所以我不得不做个组件来对其中结果进行处理。
  所以我想可以添加一个组件放在最后-------------》
1)如果是分布式搜索:
  这个组件可以在重写finsihStage做处理。算是对最终结果的排序处理即可。
2)如果只是单机:
  这个组件可以在重写process做处理
  
  
组件
  现在看一下其中一个主要的组件QueryComponent
prepare
  对于QueryComponent主要解析用户传送的语法解析参数defType,以及过滤查询fq,返回字段集fl.排序字段Sort
  
单机处理
process
  
  分布式搜索过程中的某一步,这里应该是主机要合并文档,取出对应的文档的过程,
  主机发出指定的solr主键ids来取文档集,首先取出对应的lucene的内部id集。如果某些文档已不在则弃掉。
  
  



[java] view plaincopy

  • String ids = params.get(ShardParams.IDS);
  •     if (ids != null) {//将传过来的ids,放进结果集中,并在后面取出对应的结果文档  
  •      SchemaField idField = req.getSchema().getUniqueKeyField();
  •       List idArr = StrUtils.splitSmart(ids, ",", true);  
  •       int[] luceneIds = new int[idArr.size()];  
  •       int docs = 0;  
  •       for (int i=0; i= 0)  
  •           luceneIds[docs++] = id;
  •       }

  •       DocListAndSet res = new DocListAndSet();  

  •       //这里并没有传入scores[]  

  •   res.docList = new DocSlice(0, docs, luceneIds, null, docs, 0);  
  • //需要另一种doc集合处理。  
  • if (rb.isNeedDocSet()) {  
  • List queries = new ArrayList();  
  •   queries.add(rb.getQuery());
  • List filters = rb.getFilters();
  • if (filters != null)  
  • queries.addAll(filters);
  •   res.docSet = searcher.getDocSet(queries);
  • }
  • rb.setResults(res);
  • rsp.add("response",rb.getResults().docList);  
  • return;   
  • }
  



[java] view plaincopy

    •   //封装搜索值对象与封装结果值对象   
    •    SolrIndexSearcher.QueryCommand cmd = rb.getQueryCommand();
    •    //设置超时最大值  
    •     cmd.setTimeAllowed(timeAllowed);
    •     SolrIndexSearcher.QueryResult result = new SolrIndexSearcher.QueryResult();  
    •     //搜索  
    •     searcher.search(result,cmd);
    •     //设置搜索结果  
    •     rb.setResult( result );
    •     rsp.add("response",rb.getResults().docList);  
    •     rsp.getToLog().add("hits", rb.getResults().docList.matches());  
    •     //对含有字段排序处理  
    •     doFieldSortValues(rb, searcher);
    •    //非分布查询过程,且搜索结果数小于50,进行缓存  
    •     doPrefetch(rb);


    • 目前看到真实获取文档内容的是在QueryResponseWriter例如xml的输出格式类XMLWriter  




    • 分布式处理  

    • 1)distributedProcess  
    •   @Override   
    •   public int distributedProcess(ResponseBuilder rb) throws IOException {  
    •     if (rb.stage < ResponseBuilder.STAGE_PARSE_QUERY)  
    •       return ResponseBuilder.STAGE_PARSE_QUERY;  
    •     if (rb.stage == ResponseBuilder.STAGE_PARSE_QUERY) {  
    •       createDistributedIdf(rb);
    •       return ResponseBuilder.STAGE_EXECUTE_QUERY;  
    •     }
    •     if (rb.stage < ResponseBuilder.STAGE_EXECUTE_QUERY) return ResponseBuilder.STAGE_EXECUTE_QUERY;  
    •     if (rb.stage == ResponseBuilder.STAGE_EXECUTE_QUERY) {  
    • //分布式查询  
    •      createMainQuery(rb);
    •       return ResponseBuilder.STAGE_GET_FIELDS;  
    •     }
    •     if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS) return ResponseBuilder.STAGE_GET_FIELDS;  
    •     if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {  

    •     //这里就会去对应的主机拿取需要的字段,封装请求字段的参数,放进请求队列里,可以由外部的searchHandler提交该请求,最后结果放在ShardResponse类里。  
    •      createRetrieveDocs(rb);
    •       return ResponseBuilder.STAGE_DONE;  
    •     }
    •     return ResponseBuilder.STAGE_DONE;  
    •   }


    •    



    • 2) handleResponses  

    • public void handleResponses(ResponseBuilder rb, ShardRequest sreq) {   

    •          if ((sreq.purpose & ShardRequest.PURPOSE_GET_TOP_IDS) != 0) {  

    •                       //合并ids   

    •                mergeIds(rb, sreq);

    •               //合并groupCount     

    •             mergeGroupCounts(rb, sreq);

    •            }

    •        if ((sreq.purpose & ShardRequest.PURPOSE_GET_FIELDS) != 0) {  

    •                //获取文档的字段,并将结题组装起来放到最终结果列表对应的位置里      

    •              returnFields(rb, sreq);

    •             return;   

    •        }

    •   }

    •    3)  finishStage  


    •   @Override  
    •   public void finishStage(ResponseBuilder rb) {  
    •    //这里说是==获取文档内容的值,在  
    •    if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {  
    •        //有些文档可能已不存在了,则忽略掉  
    •       for (Iterator iter = rb._responseDocs.iterator(); iter.hasNext();) {  
    •         if (iter.next() == null) {  
    •           iter.remove();
    •           rb._responseDocs.setNumFound(rb._responseDocs.getNumFound()-1);  
    •         }
    •       }

    •       rb.rsp.add("response", rb._responseDocs);  
    •     }
    •   }


    •   

    • 同样最后的结果是保存在  

    • ResponseBuilder

    •      ResponseBuilder
    •          NamedList values = new SimpleOrderedMap();  

    • 这个字段里,以键为"response",单机存储的是lucene 的内部id列表  
    • 如果是分布式,则存储的是SolrDocumentList,不用再去索引拿出对应的存储字段,
    • 这个在QueryResponseWriter里有对应的处理























运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-87664-1-1.html 上篇帖子: Solr部署到tomcat,通过war包 下篇帖子: ubuntu下部署solr
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表