设为首页 收藏本站
查看: 997|回复: 0

[经验分享] solr-dataimportHandler之批量索引

[复制链接]

尚未签到

发表于 2015-7-17 08:29:54 | 显示全部楼层 |阅读模式
http://mxsfengg.iteye.com/blog/277913    本文主要讨论solr中的dataImportHandler机制,对这个不熟的朋友,可以先看下。solr  wiki中的dataimporthandler这篇文章,笔者也对dataimporthandler进行了一些翻译,不过效果不是很好,有兴趣的朋友也可以参考一下。  http://mxsfengg.blog.163.com/blog/static/26370218200810250524813/。
  想对比较多的数据建立索引,当然要考虑一个量的问题。之前怀疑sqlEntityProcessor是一条条的去数据库中取数据的,因为还有个CachedSqlEntityProcessor,这个类的名字看起来更像是一个能够批量去数据的家伙。
  当然事实证明,笔者之前的想法是错的。
  对于完全导入、即是full-import,来说,我们只需要关注nextRow() 方法的实现。
  



Java代码 http://mxsfengg.iteye.com/javascripts/syntaxhighlighter/clipboard_new.swf?clipboard=public%20Map%3CString%2C%20Object%3E%20nextRow()%20%7B%0A%0A%09%09%2F%2F%E4%BB%8E%E7%BC%93%E5%AD%98%E4%B8%AD%E5%8F%96%EF%BC%88%E5%9C%A8sqlEntityProcessor%E4%B8%AD%E8%BF%99%E4%B8%AA%E5%BA%94%E8%AF%A5%E6%98%AFnull%EF%BC%8C%E5%9B%A0%E4%B8%BA%E5%9C%A8%E8%AE%BE%E8%AE%A1%E4%B8%8A%EF%BC%8CsqlEntityPorcessor%E5%B9%B6%E6%B2%A1%E6%9C%89%E5%AE%9E%E7%8E%B0%E7%BC%93%E5%AD%98%E3%80%82%EF%BC%89%0A%09%09if%20(rowcache%20!%3D%20null)%20%7B%0A%09%09%09return%20getFromRowCache()%3B%0A%09%09%7D%0A%09%09%2F%2F%20%E8%BF%99%E4%B8%AA%E5%8F%AA%E6%98%AF%E4%B8%BA%E4%BA%86%E9%98%B2%E6%AD%A2rowIterator%3D%3Dnull%E7%9A%84%E5%87%BA%E7%8E%B0%E5%90%A7%0A%09%09if%20(rowIterator%20%3D%3D%20null)%20%7B%0A%09%09%09String%20q%20%3D%20getQuery()%3B%0A%09%09%09initQuery(resolver.replaceTokens(q))%3B%0A%09%09%7D%0A%09%09while%20(true)%20%7B%0A%0A%09%09%09Map%3CString%2C%20Object%3E%20r%20%3D%20getNext()%3B%0A%09%09%09if%20(r%20%3D%3D%20null)%20%7B%0A%09%09%09%09return%20null%3B%0A%09%09%09%7D%0A%09%09%09%2F%2F%20%E4%BD%BF%E7%94%A8%E8%BD%AC%E5%8C%96%E5%99%A8%0A%09%09%09r%20%3D%20applyTransformer(r)%3B%0A%09%09%09if%20(r%20!%3D%20null)%20%7B%0A%09%09%09%09return%20r%3B%0A%09%09%09%7D%0A%09%09%7D%0A%0A%09%7D DSC0000.png

  • public Map nextRow() {

  •         //从缓存中取(在sqlEntityProcessor中这个应该是null,因为在设计上,sqlEntityPorcessor并没有实现缓存。)
  •         if (rowcache != null) {
  •             return getFromRowCache();
  •         }
  •         // 这个只是为了防止rowIterator==null的出现吧
  •         if (rowIterator == null) {
  •             String q = getQuery();
  •             initQuery(resolver.replaceTokens(q));
  •         }
  •         while (true) {

  •             Map r = getNext();
  •             if (r == null) {
  •                 return null;
  •             }
  •             // 使用转化器
  •             r = applyTransformer(r);
  •             if (r != null) {
  •                 return r;
  •             }
  •         }

  •     }
  我们可以看到,nextRow一开始去检测 rowcache,这应该是一个缓存的机制,目前,还没有遇到rowcache!=null的情况。我们发现nextRow实际上调用的是getNext(),



Java代码 http://mxsfengg.iteye.com/javascripts/syntaxhighlighter/clipboard_new.swf?clipboard=%20protected%20Map%3CString%2C%20Object%3E%20getNext()%20%7B%0A%20%20%20%20try%20%7B%0A%20%20%20%20%20%20if%20(rowIterator%20%3D%3D%20null)%20%7B%0A%09%09return%20null%3B%0A%09%7D%0A%20%20%20%20%20%20if%20(rowIterator.hasNext())%20%7B%0A%09%09return%20rowIterator.next()%3B%0A%09%7D%0A%20%20%20%20%20%20%2F%2F%E5%A6%82%E6%9E%9C%E4%B8%80%E4%B8%AA%E6%89%B9%E6%AC%A1%E7%BB%93%E6%9D%9F%E4%BA%86%EF%BC%8C%E9%82%A3%E4%B9%88%E5%B0%B1%E5%B0%86%E8%BF%99%E4%B8%AArowIterator%2Cquery%E7%BD%AE%E4%B8%BAnull%E3%80%82%0A%20%20%20%20%20%20rowIterator%20%3D%20null%3B%0A%20%20%20%20%20%20query%20%3D%20null%3B%0A%20%20%20%20%20%20return%20null%3B%0A%20%20%20%20%7D%20catch%20(Exception%20e)%20%7B%0A%20%20%20%20%20%20LOG.log(Level.SEVERE%2C%20%22getNext()%20failed%20for%20query%20

  • protected Map getNext() {
  •    try {
  •      if (rowIterator == null) {
  •     return null;
  • }
  •      if (rowIterator.hasNext()) {
  •     return rowIterator.next();
  • }
  •      //如果一个批次结束了,那么就将这个rowIterator,query置为null。
  •      rowIterator = null;
  •      query = null;
  •      return null;
  •    } catch (Exception e) {
  •      LOG.log(Level.SEVERE, "getNext() failed for query '" + query + "'", e);
  •      rowIterator = null;
  •      query = null;
  •      throw new DataImportHandlerException(DataImportHandlerException.WARN, e);
  •    }
  • }
  而getNext(),又是简单的调用了,rowIterator的next,我们可以这么认为,nextRow()其实就是返回rowIterator的next()。
  SqlEntityProcessor一次性的从数据库中将数据取出来放在了rowIterator中,供DocBuilder调用。
  粗略的看一下 Docbuilder中的buildDocument这个方法。
  



Java代码 http://mxsfengg.iteye.com/javascripts/syntaxhighlighter/clipboard_new.swf?clipboard=%40SuppressWarnings(%22unchecked%22)%0A%20%20private%20void%20buildDocument(VariableResolverImpl%20vr%2C%20SolrWriter.SolrDoc%20doc%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20Map%3CString%2C%20Object%3E%20pk%2C%20DataConfig.Entity%20entity%2C%20boolean%20isRoot%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20ContextImpl%20parentCtx)%20%7B%0A%0A%20%20.............................................%0A%20%20%20%20try%20%7B%0A%20%20%20%20%20%20while%20(true)%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20...................................................................%0A%20%20%20%20%20%20%20%20%3Cspan%20style%3D%22color%3A%20%23ff0000%3B%22%3E%20%20Map%3CString%2C%20Object%3E%20arow%20%3D%20entityProcessor.nextRow()%3B%0A%20%20%20%20%20%20%20%20%20%20if%20(arow%20%3D%3D%20null)%0A%20%20%20%20%20%20%20%20%20%20%20%20break%3B%0A%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%7D%3C%2Fspan%3E%0A%0A%20%20%20%20%20%20%20%20%7D%20catch%20(DataImportHandlerException%20e)%20%7B%0A%20%20%20%20%20%20%20%20%20%20if%20(verboseDebug)%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20writer.log(SolrWriter.ENTITY_EXCEPTION%2C%20entity.name%2C%20e)%3B%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20if%20(isRoot)%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20if%20(e.getErrCode()%20%3D%3D%20DataImportHandlerException.SKIP)%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20importStatistics.skipDocCount.getAndIncrement()%3B%0A%20%20%20%20%20%20%20%20%20%20%20%20%7D%20else%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20LOG.log(Level.SEVERE%2C%20%22Exception%20while%20processing%3A%20%22%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%2B%20entity.name%20%2B%20%22%20document%20%3A%20%22%20%2B%20doc%2C%20e)%3B%0A%20%20%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20%20%20if%20(e.getErrCode()%20%3D%3D%20DataImportHandlerException.SEVERE)%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20throw%20e%3B%0A%20%20%20%20%20%20%20%20%20%20%7D%20else%0A%20%20%20%20%20%20%20%20%20%20%20%20throw%20e%3B%0A%20%20%20%20%20%20%20%20%7D%20finally%20%7B%0A%20%20%20%20%20%20%20%20%20%20if%20(verboseDebug)%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20writer.log(SolrWriter.ROW_END%2C%20entity.name%2C%20null)%3B%0A%20%20%20%20%20%20%20%20%20%20%20%20if%20(entity.isDocRoot)%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20writer.log(SolrWriter.END_DOC%2C%20null%2C%20null)%3B%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%7D%20finally%20%7B%0A%20%20%20%20%20%20if%20(verboseDebug)%20%7B%0A%20%20%20%20%20%20%20%20writer.log(SolrWriter.END_ENTITY%2C%20null%2C%20null)%3B%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%7D%0A%20%20%7D

  • @SuppressWarnings("unchecked")
  •   private void buildDocument(VariableResolverImpl vr, SolrWriter.SolrDoc doc,
  •                              Map pk, DataConfig.Entity entity, boolean isRoot,
  •                              ContextImpl parentCtx) {

  •   .............................................
  •     try {
  •       while (true) {
  •            ...................................................................
  •           Map arow = entityProcessor.nextRow();
  •           if (arow == null)
  •             break;

  •                }

  •         } catch (DataImportHandlerException e) {
  •           if (verboseDebug) {
  •             writer.log(SolrWriter.ENTITY_EXCEPTION, entity.name, e);
  •           }
  •           if (isRoot) {
  •             if (e.getErrCode() == DataImportHandlerException.SKIP) {
  •               importStatistics.skipDocCount.getAndIncrement();
  •             } else {
  •               LOG.log(Level.SEVERE, "Exception while processing: "
  •                       + entity.name + " document : " + doc, e);
  •             }
  •             if (e.getErrCode() == DataImportHandlerException.SEVERE)
  •               throw e;
  •           } else
  •             throw e;
  •         } finally {
  •           if (verboseDebug) {
  •             writer.log(SolrWriter.ROW_END, entity.name, null);
  •             if (entity.isDocRoot)
  •               writer.log(SolrWriter.END_DOC, null, null);
  •           }
  •         }
  •       }
  •     } finally {
  •       if (verboseDebug) {
  •         writer.log(SolrWriter.END_ENTITY, null, null);
  •       }
  •     }
  •   }
  这个方法使用while循环一直调用nextrow()方法,直到nextRow返回null值,仔细观察getNext(),我们会发现当rowIterator取完的时候,就会返回null了,而这时候,也就跳出了while这个循环。
  那就是说,sqlEntity会将执行query语句,并将所有的结果一次取回放到rowIterator中。
  现在的问题在于,如果数据库的数量够大,一次取完所有的数据就变得不现实了,那么要怎样才能够实现批次取数据?
  以下是笔者实现的一个例子,当然这个例子有很多纰漏,需要去改进,放在这里当作抛砖引玉吧。
  这是要索引的表:
  



Sql代码 http://mxsfengg.iteye.com/javascripts/syntaxhighlighter/clipboard_new.swf?clipboard=people%20%20CREATE%20TABLE%20%60people%60%20(%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%0A%20%20%20%20%20%20%20%20%20%20%60id%60%20int(11)%20NOT%20NULL%20auto_increment%2C%20%20%0A%20%20%20%20%20%20%20%20%20%20%60name%60%20varchar(20)%20NOT%20NULL%2C%20%20%20%20%20%20%20%20%20%20%20%0A%20%20%20%20%20%20%20%20%20%20PRIMARY%20KEY%20%20(%60id%60)%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%0A%20%20%20%20%20%20%20%20)%20ENGINE%3DInnoDB%20DEFAULT%20CHARSET%3Dlatin1%20

  • people  CREATE TABLE `people` (
  •           `id` int(11) NOT NULL auto_increment,
  •           `name` varchar(20) NOT NULL,
  •           PRIMARY KEY  (`id`)
  •         ) ENGINE=InnoDB DEFAULT CHARSET=latin1
  
  一般情况下,相应的dataConfig.xml文件,我们可以这么写:
  



Xml代码 http://mxsfengg.iteye.com/javascripts/syntaxhighlighter/clipboard_new.swf?clipboard=%20%20%20%20%20%20%20%3Centity%20name%3D%22y%22%20query%3D%22select%20*%20from%20people%20%22%3E%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%3Cfield%20column%3D%22id%22%20name%3D%22id%22%20%2F%3E%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%3Cfield%20column%3D%22name%22%20name%3D%22name%22%20%2F%3E%0A%0A%20%20%20%20%20%20%3C%2Fentity%3E


  •             
  •             

  • lt;/entity>
  这个结构就会一次性从数据库中取完所有的数据,放在rowIterator中,但现在我们并不想这样去实现,以适应更多的数据量。笔者增加了一个表。下面是这个表的数据。



Sql代码 http://mxsfengg.iteye.com/javascripts/syntaxhighlighter/clipboard_new.swf?clipboard=%2B-----%2B---------%2B%0A%7C%20id%20%20%7C%20item

  • +-----+---------+
  • | id  | item    |
  • +-----+---------+
  • |   1 |       0 |
  • |   5 |   10000 |
  • |   6 |   50000 |
  • |   7 |   60000 |
  • |   8 |   70000 |
  • |   9 |   80000 |
  • |  10 |   90000 |
  • |  11 |  100000 |
  • |  12 |  110000 |
  • |  13 |  120000 |
  • |  14 |  130000 |
  • |  15 |  140000 |
  • |  16 |  150000 |
  修改dataconfig.xml文件
  



Xml代码 http://mxsfengg.iteye.com/javascripts/syntaxhighlighter/clipboard_new.swf?clipboard=%3Centity%20name%3D%22x%22%20query%3D%22select%20*%20from%20item%22%20rootEntity%3D%22false%22%3E%0A%20%20%20%20%20%20%20%20%3Centity%20name%3D%22y%22%20query%3D%22select%20*%20from%20people%20where%20id%20between%20%24%7Bx.item%7D%20and%20%24%7Bx.item%7D%2B1000%22%3E%0A%09%20%20%20%3Cfield%20column%3D%22id%22%20name%3D%22id%22%20%2F%3E%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%3Cfield%20column%3D%22name%22%20name%3D%22name%22%20%2F%3E%0A%0A%20%20%20%20%20%20%20%3C%2Fentity%3E%0A%3C%2Fentity%3E


  •         
  •       
  •                   

  •       

  
  显而易见,笔者是通过增加一个表的方式来达到控制批量存取的效果的。虽然这不失为问题的解决方法,不过增加一个表来这个步骤实在让人觉得有点繁琐,且通用性不强。
  当然我们也可以使用xml来代替数据库中的表,不过这只是一种换汤不换药的方式罢了。
  或许,通过继承SqlEntityProcessor,覆盖nextRow方法,扩展它的功能,也是一种可以尝试的选择。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-87443-1-1.html 上篇帖子: Apache Solr facet 分组查询 下篇帖子: Solr安装方法总结
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表