torlee 发表于 2017-12-19 11:39:51

Solr Update插件自定义Update Chain按条件更新索引

  背景:基于call客,来电和跟进记录等多个数据来源的用户文档,需要在更新是判断首来源的时间。
  如对电话号码11xxxx来说,来电时间是今天,call客时间是昨天,而call客数据又可能因为网络原因晚上传上来,这样一来11xxxx这个用户document的来源时间需要更新成昨天。
  分析:solr的默认update没有办法匹配业务的灵活的更新逻辑。更新逻辑如下,当更新来源时间的时候,如果新的来源时间比之前的来源时间晚,则保持之前的来源时间。
  代码实现:
  

package custom.solr;  

import java.io.IOException;  

import org.apache.lucene.util.BytesRef;  

import org.apache.solr.common.SolrInputDocument;  

import org.apache.solr.core.SolrCore;  

import org.apache.solr.handler.component.RealTimeGetComponent;  

import org.apache.solr.request.SolrQueryRequest;  

import org.apache.solr.response.SolrQueryResponse;  

import org.apache.solr.search.SolrIndexSearcher;  

import org.apache.solr.update.AddUpdateCommand;  

import org.apache.solr.update.processor.UpdateRequestProcessor;  

import org.apache.solr.update.processor.UpdateRequestProcessorFactory;  

import org.apache.solr.util.RefCounted;  

  

public>
{  
@Override
  

public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next)  
{
  

return new ConditionalUpdateProcessor(req, rsp, next);  
}
  
}
  

  

class ConditionalUpdateProcessor extends UpdateRequestProcessor  
{
  

public static final String ORIGIN_TIMESTAMP = "origin_timestamp";  

public ConditionalUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next)  
{
  

super(next);  
core
= req.getCore();  
}
  

  

private final SolrCore core;  

  
@Override
  

public void processAdd(AddUpdateCommand cmd) throws IOException  
{
  
SolrInputDocument newDoc
= cmd.getSolrInputDocument();  
BytesRef indexedId
= cmd.getIndexedId();  
RefCounted
<SolrIndexSearcher> newestSearcher = core.getRealtimeSearcher();  
SolrIndexSearcher searcher;
  

long lookup;  
searcher
= (SolrIndexSearcher) newestSearcher.get();  
lookup
= searcher.lookupId(indexedId);  

//if not exists  
if (lookup < 0)
  
{super.processAdd(cmd);
  
}
  
      SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocument(core, indexedId);
  
Object newOriginTimestamp = newDoc.getFieldValue(ORIGIN_TIMESTAMP);
  
Object oldOriginTimestamp = oldDoc.getFieldValue(ORIGIN_TIMESTAMP);
  
if (newOriginTimestamp != null && oldOriginTimestamp != null)
  
{
  
if (Long.valueOf(oldOriginTimestamp.toString()) < Long.valueOf(newOriginTimestamp.toString()))
  
{
  
newDoc.setField(ORIGIN_TIMESTAMP, oldOriginTimestamp);
  
}
  
}
  
// pass it up the chain
  
super.processAdd(cmd);
  
}
  

  
}
  

  1.将该类编译后生成jar包放到 /var/lib/solr/plugins目录下,或者你任意指定一个目录。
  2.配置solrconfig.xml加载该jar包。(注意修改jar包或者solrconfig.xml之后要reload collection)
  

<lib dir="/var/lib/solr/plugins" />  

  3.配置solrconfig.xml的默认update用哪个chain名字。
  

<requestHandler name="/update">  
<!-- See below for information on defining
  
updateRequestProcessorChains that can be used by name
  
on each Update Request
  
-->
  
<lst name="defaults">
  
<str name="update.chain">condition</str>
  
</lst>
  
</requestHandler>
  

  以及solrconfig.xml chain的流程。
  

<updateRequestProcessorChain name="condition">  
<processor />
  
<processor />
  
<processor />
  
<processor />
  
</updateRequestProcessorChain>
  

  *关于为什么放在DistibutedUpdateProcessFactory之后。
  https://wiki.apache.org/solr/Atomic_Updates
  2017.01.12优化:
  如下场景时,上面代码会出现问题:
  老的数据没有立即commit,还保存在TLog中,此时RealTimeGetComponet.getInputDocument方法获取不到老数据,导致处理逻辑不符合期望,来源时间不正确。
  代码优化如下:
  

SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocumentFromTlog(core, indexedId);  

if (oldDoc == null)  
{
  
oldDoc
= RealTimeGetComponent.getInputDocument(core, indexedId);  
}
  
页: [1]
查看完整版本: Solr Update插件自定义Update Chain按条件更新索引