设为首页 收藏本站
查看: 877|回复: 0

[经验分享] Apache Nutch 1.3 学习笔记九(SolrIndexer)

[复制链接]

尚未签到

发表于 2015-8-2 09:00:47 | 显示全部楼层 |阅读模式
  
  新的Nutch使用了Solr来做了后台的索引服务,nutch正在努力与Solr进行更方便的整合,它很好的与Solr处理了耦合关系,把Solr当成一个服务,Nutch只要调用其客户端就可以与其进行通讯。

1. bin/nutch solrindex
     这个命令是用来对抓取下来的内容建立索引,帮助如下:
   
  


  • Usage: SolrIndexer ( ... | -dir )  
  
   这里我们可以看到第一个参数为,这是solr服务的一个地址,第二个参数为抓取的url数据库名,第三个参数为反向链接数据库,第四个参数就segment目录名

  使用这个命令的前提是你要有一个相应的Solr服务才行。

2. 看一下SolrIndexer这个类做了些什么
  bin/nutch solrindex这个命令最终是调用SolrIndexer的main方法,其中一个最主要是方法是indexSolr方法,
下面来看一下这个方法做了些什么
  


  • final JobConf job = new NutchJob(getConf());  

  •    job.setJobName("index-solr " + solrUrl);  

  • // 这里会初始化Job任务,设置其Map与Reduce方法  

  •    IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);  



  •    job.set(SolrConstants.SERVER_URL, solrUrl);  



  • // 这里配置OutputFormat的类  

  •    NutchIndexWriterFactory.addClassToConf(job, SolrWriter.class);  



  •    job.setReduceSpeculativeExecution(false);  



  •    final Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-" +  

  •                         new Random().nextInt());  

  • // 配置输出路径  

  •    FileOutputFormat.setOutputPath(job, tmp);  

  •    try {  

  •     // 提交任务  

  •      JobClient.runJob(job);  

  •      // do the commits once and for all the reducers in one go  

  •      SolrServer solr =  new CommonsHttpSolrServer(solrUrl);  

  •      solr.commit();  

  •      long end = System.currentTimeMillis();  

  •      LOG.info("SolrIndexer: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));  

  •    }  

  •    catch (Exception e){  

  •      LOG.error(e);  

  •    } finally {  

  •      FileSystem.get(job).delete(tmp, true);  

  •    }  
  
下面来看一下IndexMapReduce.initMRJob这个方法做了些什么
  


  • public static void initMRJob(Path crawlDb, Path linkDb,  

  •                           Collection segments,  

  •                           JobConf job) {  



  •    LOG.info("IndexerMapReduce: crawldb: " + crawlDb);  

  •    LOG.info("IndexerMapReduce: linkdb: " + linkDb);  



  • // 加入segment中要建立索引的目录  

  •    for (final Path segment : segments) {  

  •      LOG.info("IndexerMapReduces: adding segment: " + segment);  

  •      FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.FETCH_DIR_NAME));  // crawl_fetch  

  •      FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.PARSE_DIR_NAME));  // fetch_parse  

  •      FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));         // parse_data  

  •      FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));         // parse_text  

  •    }  



  •    FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));         // crawldb/current  

  •    FileInputFormat.addInputPath(job, new Path(linkDb, LinkDb.CURRENT_NAME));           // linkdb/current  

  •    job.setInputFormat(SequenceFileInputFormat.class);                    // 设置输入的文件格式, 这里所有目录中的文件格式都是SequenceFileInputFormat,  



  • // 设置Map与Reduce的类型  

  •    job.setMapperClass(IndexerMapReduce.class);  

  •    job.setReducerClass(IndexerMapReduce.class);  



  • // 设置输出类型  

  •    job.setOutputFormat(IndexerOutputFormat.class);  

  •    job.setOutputKeyClass(Text.class);  

  •    job.setMapOutputValueClass(NutchWritable.class);   // 这里设置了Map输出的Value的类型,key类型还是上面的Text  

  •    job.setOutputValueClass(NutchWritable.class);  

  • }  



  •     IndexerMapRducer中的Map只是读入对,把value做NutchWritable进行了封装再输出,下面来看一下IndexerMapReduce中的Reduce方法做了些什么  

  • public void reduce(Text key, Iterator values,  

  •                     OutputCollector output, Reporter reporter)  

  •    throws IOException {  

  •    Inlinks inlinks = null;  

  •    CrawlDatum dbDatum = null;  

  •    CrawlDatum fetchDatum = null;  

  •    ParseData parseData = null;  

  •    ParseText parseText = null;  

  • // 这一块代码是判断相同key的value的类型,根据其类型来对  

  • // inlinks,dbDatum,fetchDatum,parseData,praseText对象进行赋值  

  •    while (values.hasNext()) {  

  •      final Writable value = values.next().get(); // unwrap  

  •      if (value instanceof Inlinks) {  

  •        inlinks = (Inlinks)value;  

  •      } else if (value instanceof CrawlDatum) {  

  •        final CrawlDatum datum = (CrawlDatum)value;  

  •        if (CrawlDatum.hasDbStatus(datum))  

  •          dbDatum = datum;  

  •        else if (CrawlDatum.hasFetchStatus(datum)) {  

  •          // don't index unmodified (empty) pages  

  •          if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED)  

  •            fetchDatum = datum;  

  •        } else if (CrawlDatum.STATUS_LINKED == datum.getStatus() ||  

  •                   CrawlDatum.STATUS_SIGNATURE == datum.getStatus() ||  

  •                   CrawlDatum.STATUS_PARSE_META == datum.getStatus()) {  

  •          continue;  

  •        } else {  

  •          throw new RuntimeException("Unexpected status: "+datum.getStatus());  

  •        }  

  •      } else if (value instanceof ParseData) {  

  •        parseData = (ParseData)value;  

  •      } else if (value instanceof ParseText) {  

  •        parseText = (ParseText)value;  

  •      } else if (LOG.isWarnEnabled()) {  

  •        LOG.warn("Unrecognized type: "+value.getClass());  

  •      }  

  •    }  



  •    if (fetchDatum == null || dbDatum == null  

  •        || parseText == null || parseData == null) {  

  •      return;                                     // only have inlinks  

  •    }  



  •    if (!parseData.getStatus().isSuccess() ||  

  •        fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {  

  •      return;  

  •    }  



  • // 生成一个可以索引的文档对象,在Lucene中,Docuemnt就是一个抽象的文档对象,其有Fields组成,而Field又由Terms组成  

  •    NutchDocument doc = new NutchDocument();  

  •    final Metadata metadata = parseData.getContentMeta();  



  •    // add segment, used to map from merged index back to segment files  

  •    doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY));  



  •    // add digest, used by dedup  

  •    doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY));  



  •    final Parse parse = new ParseImpl(parseText, parseData);  

  •    try {  

  •      // extract information from dbDatum and pass it to  

  •      // fetchDatum so that indexing filters can use it  

  •      final Text url = (Text) dbDatum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);  

  •      if (url != null) {  

  •        fetchDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, url);  

  •      }  

  •      // run indexing filters  

  •      doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks);  

  •    } catch (final IndexingException e) {  

  •      if (LOG.isWarnEnabled()) { LOG.warn("Error indexing "+key+": "+e); }  

  •      return;  

  •    }  



  •    // skip documents discarded by indexing filters  

  •    if (doc == null) return;  



  •    float boost = 1.0f;  

  •    // run scoring filters  

  •    try {  

  •      boost = this.scfilters.indexerScore(key, doc, dbDatum,  

  •              fetchDatum, parse, inlinks, boost);  

  •    } catch (final ScoringFilterException e) {  

  •      if (LOG.isWarnEnabled()) {  

  •        LOG.warn("Error calculating score " + key + ": " + e);  

  •      }  

  •      return;  

  •    }  

  •    // apply boost to all indexed fields.  

  •    doc.setWeight(boost);  

  •    // store boost for use by explain and dedup  

  •    doc.add("boost", Float.toString(boost));  



  • // 收集输出结果,用下面的IndexerOutputFormat写到Solr中去  

  •    output.collect(key, doc);  

  • }  
  
  下面来看一下IndexerOutputFormat中的getRecordWriter是如何实现的
  


  • @Override  

  • public RecordWriter getRecordWriter(FileSystem ignored,  

  •     JobConf job, String name, Progressable progress) throws IOException {  


  •   // populate JobConf with field indexing options  

  •   IndexingFilters filters = new IndexingFilters(job);  


  • / 这里可以写到多个输出源中  

  •   final NutchIndexWriter[] writers =  

  •     NutchIndexWriterFactory.getNutchIndexWriters(job);  



  •   for (final NutchIndexWriter writer : writers) {  

  •     writer.open(job, name);  

  •   }  

  • / 这里使用了一个inner class来返回相应的RecordWriter,用于输出Reduce收集的对  

  •   return new RecordWriter() {  



  •     public void close(Reporter reporter) throws IOException {  

  •       for (final NutchIndexWriter writer : writers) {  

  •         writer.close();  

  •       }  

  •     }  



  •     public void write(Text key, NutchDocument doc) throws IOException {  

  •       for (final NutchIndexWriter writer : writers) {  

  •         writer.write(doc);  

  •       }  

  •     }  

  •   };  

  • }  
  
这里有多个NutchIndexWriter,目前只有一个子类,就是SolrWriter,下面分析一下其write方法做了些什么
  


  • public void write(NutchDocument doc) throws IOException {  

  •     final SolrInputDocument inputDoc = new SolrInputDocument();  

  •     // 生成Solr的InputDocuement对象  

  •     for(final Entry e : doc) {  

  •       for (final Object val : e.getValue().getValues()) {  

  •         // normalise the string representation for a Date  

  •         Object valval2 = val;  

  •         if (val instanceof Date){  

  •           val2 = DateUtil.getThreadLocalDateFormat().format(val);  

  •         }  

  •         inputDoc.addField(solrMapping.mapKey(e.getKey()), val2, e.getValue().getWeight());  

  •         String sCopy = solrMapping.mapCopyKey(e.getKey());  

  •         if (sCopy != e.getKey()) {  

  •             inputDoc.addField(sCopy, val);   

  •         }  

  •       }  

  •     }  

  •     inputDoc.setDocumentBoost(doc.getWeight());  

  •     inputDocs.add(inputDoc);   // 加入缓冲  

  •     if (inputDocs.size() >= commitSize) {   // 缓冲到达commitSize后,调用solr客户端的add方法写出到Solr服务端  

  •       try {  

  •         solr.add(inputDocs);  

  •       } catch (final SolrServerException e) {  

  •         throw makeIOException(e);  

  •       }  

  •       inputDocs.clear();  

  •     }  

  •   }  
  
3. 总结
  这里大概介绍了一下Nutch对于抓取内容的索引建立过程,也使用了一个MP任务,在Reduce端主要是把要索引的字段生成了一个NutchDocument对象,再通过SolrWriter写出到Solr的服务端,这里SolrWriter封装了Solr的客户端对象,在这里要把Nutch中的Document转换成Solr中的Document,因为这边的NutchDocument是一个可Writable的类型,它一定要是可序列化的,而SorlInputDocument是SolrInputFormat是不可以被序列化的。
  
  作者:http://blog.iyunv.com/amuseme_lu
  
  
  
  
  
  
  
  相关文章阅读及免费下载:
  
  
  
  Apache Nutch 1.3 学习笔记目录
  
  
  
  Apache Nutch 1.3 学习笔记一
  
  
  
  Apache Nutch 1.3 学习笔记二
  
  
  
  Apache Nutch 1.3 学习笔记三(Inject)
  
  
  
  Apache Nutch 1.3 学习笔记三(Inject CrawlDB Reader)
  
  
  
  Apache Nutch 1.3 学习笔记四(Generate)
  
  
  
  Apache Nutch 1.3 学习笔记四(SegmentReader分析)
  
  
  
  Apache Nutch 1.3 学习笔记五(FetchThread)
  
  
  
  Apache Nutch 1.3 学习笔记五(Fetcher流程)
  
  
  
  Apache Nutch 1.3 学习笔记六(ParseSegment)
  
  
  
  Apache Nutch 1.3 学习笔记七(CrawlDb - updatedb)
  
  
  
  Apache Nutch 1.3 学习笔记八(LinkDb)
  
  
  
  Apache Nutch 1.3 学习笔记九(SolrIndexer)
  
  
  
  Apache Nutch 1.3 学习笔记十(Ntuch 插件机制简单介绍)
  
  
  
  Apache Nutch 1.3 学习笔记十(插件扩展)
  
  
  
  Apache Nutch 1.3 学习笔记十(插件机制分析)
  
  
  
  Apache Nutch 1.3 学习笔记十一(页面评分机制 OPIC)
  
  
  
  Apache Nutch 1.3 学习笔记十一(页面评分机制 LinkRank 介绍)
  
  
  
  Apache Nutch 1.3 学习笔记十二(Nutch 2.0 的主要变化)
  
  
  
  更多《Apache Nutch文档》,尽在开卷有益360 http://www.docin.com/book_360
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-93179-1-1.html 上篇帖子: Apache 内存释放~ 下篇帖子: 项目管理工具Redmine + SubVersion + Apache + windows环境安装搭建(转)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表