设为首页 收藏本站
查看: 848|回复: 0

[经验分享] solr dataimport 数据导入源码分析(十四)

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-7-16 13:52:50 | 显示全部楼层 |阅读模式
  在solr的数据导入源码里面,DocBuilder类用于创建solr的Document,具体过程不在本文分析,最后调用SolrWriter类的对象执行SolrInputDocument doc对象的提交、修改、删除等操作
  本人先来分析一下SolrWriter对象的创建,在DataImportHandler类(数据导入请求处理)的void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp)方法里面



UpdateRequestProcessorChain processorChain =
req.getCore().getUpdateProcessingChain(SolrPluginUtils.resolveUpdateChainParam(params, LOG));
UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
SolrResourceLoader loader = req.getCore().getResourceLoader();
SolrWriter sw = getSolrWriter(processor, loader, requestParams);
  创建SolrWriter对象的方法如下



private SolrWriter getSolrWriter(final UpdateRequestProcessor processor,
final SolrResourceLoader loader, final DataImporter.RequestParams requestParams) {
return new SolrWriter(processor) {
public boolean upload(SolrInputDocument document) {
try {
if (requestParams.debug) {
debugDocuments.add(document);
}
return super.upload(document);
} catch (RuntimeException e) {
LOG.error( "Exception while adding: " + document, e);
return false;
}
}
};
}
  我们浏览一下相关类的UML模型
DSC0000.png
  DIHWriter接口定义了各种操作SolrInputDocument doc对象的方法,SolrWriter类提供了具体实现



/**
*  Writes documents to SOLR.
*
* This API is experimental and may change in the future.
*
* @version $Id: SolrWriter.java 1303792 2012-03-22 14:11:16Z jdyer $
* @since solr 1.3
*/
public class SolrWriter extends DIHWriterBase implements DIHWriter {
private static final Logger log = LoggerFactory.getLogger(SolrWriter.class);
static final String LAST_INDEX_KEY = "last_index_time";
private final UpdateRequestProcessor processor;
public SolrWriter(UpdateRequestProcessor processor) {
this.processor = processor;
}
public void close() {
try {
processor.finish();
} catch (IOException e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Unable to call finish() on UpdateRequestProcessor", e);
}
}
public boolean upload(SolrInputDocument d) {
try {
AddUpdateCommand command = new AddUpdateCommand();
command.solrDoc = d;
command.allowDups = false;
command.overwritePending = true;
command.overwriteCommitted = true;
processor.processAdd(command);
} catch (Exception e) {
log.warn("Error creating document : " + d, e);
return false;
}
return true;
}
public void deleteDoc(Object id) {
try {
log.info("Deleting document: " + id);
DeleteUpdateCommand delCmd = new DeleteUpdateCommand();
delCmd.id = id.toString();
delCmd.fromPending = true;
delCmd.fromCommitted = true;
processor.processDelete(delCmd);
} catch (IOException e) {
log.error("Exception while deleteing: " + id, e);
}
}
public void deleteByQuery(String query) {
try {
log.info("Deleting documents from Solr with query: " + query);
DeleteUpdateCommand delCmd = new DeleteUpdateCommand();
delCmd.query = query;
delCmd.fromCommitted = true;
delCmd.fromPending = true;
processor.processDelete(delCmd);
} catch (IOException e) {
log.error("Exception while deleting by query: " + query, e);
}
}
public void commit(boolean optimize) {
try {
CommitUpdateCommand commit = new CommitUpdateCommand(optimize);
processor.processCommit(commit);
} catch (Throwable t) {
log.error("Exception while solr commit.", t);
}
}
public void rollback() {
try {
RollbackUpdateCommand rollback = new RollbackUpdateCommand();
processor.processRollback(rollback);
} catch (Throwable t) {
log.error("Exception while solr rollback.", t);
}
}
public void doDeleteAll() {
try {
DeleteUpdateCommand deleteCommand = new DeleteUpdateCommand();
deleteCommand.query = "*:*";
deleteCommand.fromCommitted = true;
deleteCommand.fromPending = true;
processor.processDelete(deleteCommand);
} catch (IOException e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Exception in full dump while deleting all documents.", e);
}
}
static String getResourceAsString(InputStream in) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
byte[] buf = new byte[1024];
int sz = 0;
try {
while ((sz = in.read(buf)) != -1) {
baos.write(buf, 0, sz);
}
} finally {
try {
in.close();
} catch (Exception e) {
}
}
return new String(baos.toByteArray(), "UTF-8");
}
static String getDocCount() {
if (DocBuilder.INSTANCE.get() != null) {
return ""
+ (DocBuilder.INSTANCE.get().importStatistics.docCount.get() + 1);
} else {
return null;
}
}
public void init(Context context) {
/* NO-OP */        
}   
}
  构造方法初始化成员变量UpdateRequestProcessor processor,针对各种不同的操作构建不同的UpdateCommand对象的具体类,最后调用UpdateRequestProcessor processor对象的不同方法操作具体的UpdateCommand对象(这些相关类已经是solr的成员类了)
  UpdateRequestProcessor类为针对SolrInputDocument doc对象的请求处理器,该类为抽象类,具体继承类实现相应操作



/**
* This is a good place for subclassed update handlers to process the document before it is
* indexed.  You may wish to add/remove fields or check if the requested user is allowed to
* update the given document...
*
* Perhaps you continue adding an error message (without indexing the document)...
* perhaps you throw an error and halt indexing (remove anything already indexed??)
*
* By default, this just passes the request to the next processor in the chain.
*
* @since solr 1.3
*/
public abstract class UpdateRequestProcessor {
protected final Logger log = LoggerFactory.getLogger(getClass());
protected final UpdateRequestProcessor next;
public UpdateRequestProcessor( UpdateRequestProcessor next) {
this.next = next;
}
public void processAdd(AddUpdateCommand cmd) throws IOException {
if (next != null) next.processAdd(cmd);
}
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
if (next != null) next.processDelete(cmd);
}
public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
if (next != null) next.processMergeIndexes(cmd);
}
public void processCommit(CommitUpdateCommand cmd) throws IOException
{
if (next != null) next.processCommit(cmd);
}
/**
* @since Solr 1.4
*/
public void processRollback(RollbackUpdateCommand cmd) throws IOException
{
if (next != null) next.processRollback(cmd);
}
public void finish() throws IOException {
if (next != null) next.finish();   
}
}
  执行SolrInputDocument doc对象的增删改操作的具体继承类为RunUpdateProcessor



class RunUpdateProcessor extends UpdateRequestProcessor
{
private final SolrQueryRequest req;
private final UpdateHandler updateHandler;
public RunUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next) {
super( next );
this.req = req;
this.updateHandler = req.getCore().getUpdateHandler();
}
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
cmd.doc = DocumentBuilder.toDocument(cmd.getSolrInputDocument(), req.getSchema());
updateHandler.addDoc(cmd);
super.processAdd(cmd);
}
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
if( cmd.id != null ) {
updateHandler.delete(cmd);
}
else {
updateHandler.deleteByQuery(cmd);
}
super.processDelete(cmd);
}
@Override
public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
updateHandler.mergeIndexes(cmd);
super.processMergeIndexes(cmd);
}
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException
{
updateHandler.commit(cmd);
super.processCommit(cmd);
}
/**
* @since Solr 1.4
*/
@Override
public void processRollback(RollbackUpdateCommand cmd) throws IOException
{
updateHandler.rollback(cmd);
super.processRollback(cmd);
}
}
  调用成员UpdateHandler updateHandler对象的相应方法,UpdateHandler类为抽象类,由继承类DirectUpdateHandler2实现具体操作
  最后调用成员IndexWriter writer对象操作lucene的Document



// iwCommit protects internal data and open/close of the IndexWriter and
// is a mutex. Any use of the index writer should be protected by iwAccess,
// which admits multiple simultaneous acquisitions.  iwAccess is
// mutually-exclusive with the iwCommit lock.
protected final Lock iwAccess, iwCommit;
protected IndexWriter writer;
public DirectUpdateHandler2(SolrCore core) throws IOException {
super(core);
// Pass fairness=true so commit request is not starved
// when add/updates are running hot (SOLR-2342):
ReadWriteLock rwl = new ReentrantReadWriteLock(true);
iwAccess = rwl.readLock();
iwCommit = rwl.writeLock();
commitTracker = new CommitTracker("commitTracker", core,
core.getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxDocs,
core.getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxTime, true, false);
}
// must only be called when iwCommit lock held
private void deleteAll() throws IOException {
SolrCore.log.info(core.getLogId()+"REMOVING ALL DOCUMENTS FROM INDEX");
closeWriter();
writer = createMainIndexWriter("DirectUpdateHandler2", true);
}
// must only be called when iwCommit lock held
protected void openWriter() throws IOException {
if (writer==null) {
writer = createMainIndexWriter("DirectUpdateHandler2", false);
}
}
// must only be called when iwCommit lock held
protected void closeWriter() throws IOException {
try {
numDocsPending.set(0);
if (writer!=null) writer.close();
} finally {
// if an exception causes the writelock to not be
// released, we could try and delete it here
writer=null;
}
}
// must only be called when iwCommit lock held
protected void rollbackWriter() throws IOException {
try {
numDocsPending.set(0);
if (writer!=null) writer.rollback();
} finally {
writer = null;
}
}
@Override
public int addDoc(AddUpdateCommand cmd) throws IOException {
addCommands.incrementAndGet();
addCommandsCumulative.incrementAndGet();
int rc=-1;
// if there is no ID field, use allowDups
if( idField == null ) {
cmd.allowDups = true;
cmd.overwriteCommitted = false;
cmd.overwritePending = false;
}
iwAccess.lock();
try {
// We can't use iwCommit to protect internal data here, since it would
// block other addDoc calls.  Hence, we synchronize to protect internal
// state.  This is safe as all other state-changing operations are
// protected with iwCommit (which iwAccess excludes from this block).
synchronized (this) {
// adding document -- prep writer
        openWriter();
commitTracker.addedDocument( cmd.commitWithin );
} // end synchronized block
// this is the only unsynchronized code in the iwAccess block, which
// should account for most of the time
Term updateTerm = null;
if (cmd.overwriteCommitted || cmd.overwritePending) {
if (cmd.indexedId == null) {
cmd.indexedId = getIndexedId(cmd.doc);
}
Term idTerm = this.idTerm.createTerm(cmd.indexedId);
boolean del = false;
if (cmd.updateTerm == null) {
updateTerm = idTerm;
} else {
del = true;
updateTerm = cmd.updateTerm;
}
writer.updateDocument(updateTerm, cmd.getLuceneDocument(schema));
if(del) { // ensure id remains unique
BooleanQuery bq = new BooleanQuery();
bq.add(new BooleanClause(new TermQuery(updateTerm), Occur.MUST_NOT));
bq.add(new BooleanClause(new TermQuery(idTerm), Occur.MUST));
writer.deleteDocuments(bq);
}
} else {
// allow duplicates
        writer.addDocument(cmd.getLuceneDocument(schema));
}
rc = 1;
} finally {
iwAccess.unlock();
if (rc!=1) {
numErrors.incrementAndGet();
numErrorsCumulative.incrementAndGet();
} else {
numDocsPending.incrementAndGet();
}
}
return rc;
}

// could return the number of docs deleted, but is that always possible to know???
  @Override
public void delete(DeleteUpdateCommand cmd) throws IOException {
deleteByIdCommands.incrementAndGet();
deleteByIdCommandsCumulative.incrementAndGet();
if (!cmd.fromPending && !cmd.fromCommitted) {
numErrors.incrementAndGet();
numErrorsCumulative.incrementAndGet();
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"meaningless command: " + cmd);
}
if (!cmd.fromPending || !cmd.fromCommitted) {
numErrors.incrementAndGet();
numErrorsCumulative.incrementAndGet();
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"operation not supported" + cmd);
}
iwCommit.lock();
try {
openWriter();
commitTracker.deletedDocument( cmd.commitWithin );
writer.deleteDocuments(idTerm.createTerm(idFieldType.toInternal(cmd.id)));
} finally {
iwCommit.unlock();
}
commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
}
// why not return number of docs deleted?
// Depending on implementation, we may not be able to immediately determine the num...
   @Override
public void deleteByQuery(DeleteUpdateCommand cmd) throws IOException {
deleteByQueryCommands.incrementAndGet();
deleteByQueryCommandsCumulative.incrementAndGet();
if (!cmd.fromPending && !cmd.fromCommitted) {
numErrors.incrementAndGet();
numErrorsCumulative.incrementAndGet();
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"meaningless command: " + cmd);
}
if (!cmd.fromPending || !cmd.fromCommitted) {
numErrors.incrementAndGet();
numErrorsCumulative.incrementAndGet();
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"operation not supported" + cmd);
}
boolean madeIt=false;
boolean delAll=false;
try {
Query q = QueryParsing.parseQuery(cmd.query, schema);
delAll = MatchAllDocsQuery.class == q.getClass();
iwCommit.lock();
try {
commitTracker.deletedDocument(cmd.commitWithin);
if (delAll) {
deleteAll();
} else {
openWriter();
writer.deleteDocuments(q);
}
} finally {
iwCommit.unlock();
}
madeIt=true;
commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
} finally {
if (!madeIt) {
numErrors.incrementAndGet();
numErrorsCumulative.incrementAndGet();
}
}
}
@Override
public int mergeIndexes(MergeIndexesCommand cmd) throws IOException {
mergeIndexesCommands.incrementAndGet();
int rc = -1;
iwCommit.lock();
try {
log.info("start " + cmd);
IndexReader[] readers = cmd.readers;
if (readers != null && readers.length > 0) {
openWriter();
writer.addIndexes(readers);
rc = 1;
} else {
rc = 0;
}
log.info("end_mergeIndexes");
} finally {
iwCommit.unlock();
}
if (rc == 1 && commitTracker.getTimeUpperBound() > 0) {
commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
}
return rc;
}
public void forceOpenWriter() throws IOException  {
iwCommit.lock();
try {
openWriter();
} finally {
iwCommit.unlock();
}
}
@Override
public void commit(CommitUpdateCommand cmd) throws IOException {
if (cmd.optimize) {
optimizeCommands.incrementAndGet();
} else {
commitCommands.incrementAndGet();
if (cmd.expungeDeletes) expungeDeleteCommands.incrementAndGet();
}
Future[] waitSearcher = null;
if (cmd.waitSearcher) {
waitSearcher = new Future[1];
}
boolean error=true;
iwCommit.lock();
try {
log.info("start "+cmd);
if (cmd.optimize) {
openWriter();
writer.forceMerge(cmd.maxOptimizeSegments);
} else if (cmd.expungeDeletes) {
openWriter();
writer.forceMergeDeletes();
}
closeWriter();
callPostCommitCallbacks();
if (cmd.optimize) {
callPostOptimizeCallbacks();
}
// open a new searcher in the sync block to avoid opening it
// after a deleteByQuery changed the index, or in between deletes
// and adds of another commit being done.
core.getSearcher(true,false,waitSearcher);
// reset commit tracking
      commitTracker.didCommit();
log.info("end_commit_flush");
error=false;
}
finally {
iwCommit.unlock();
addCommands.set(0);
deleteByIdCommands.set(0);
deleteByQueryCommands.set(0);
numErrors.set(error ? 1 : 0);
}
// if we are supposed to wait for the searcher to be registered, then we should do it
// outside of the synchronized block so that other update operations can proceed.
if (waitSearcher!=null && waitSearcher[0] != null) {
try {
waitSearcher[0].get();
} catch (InterruptedException e) {
SolrException.log(log,e);
} catch (ExecutionException e) {
SolrException.log(log,e);
}
}
}
/**
* @since Solr 1.4
*/
@Override
public void rollback(RollbackUpdateCommand cmd) throws IOException {
rollbackCommands.incrementAndGet();
boolean error=true;
iwCommit.lock();
try {
log.info("start "+cmd);
rollbackWriter();
//callPostRollbackCallbacks();
// reset commit tracking
      commitTracker.didRollback();
log.info("end_rollback");
error=false;
}
finally {
iwCommit.unlock();
addCommandsCumulative.set(
addCommandsCumulative.get() - addCommands.getAndSet( 0 ) );
deleteByIdCommandsCumulative.set(
deleteByIdCommandsCumulative.get() - deleteByIdCommands.getAndSet( 0 ) );
deleteByQueryCommandsCumulative.set(
deleteByQueryCommandsCumulative.get() - deleteByQueryCommands.getAndSet( 0 ) );
numErrors.set(error ? 1 : 0);
}
}

@Override
public void close() throws IOException {
log.info("closing " + this);
iwCommit.lock();
try{
commitTracker.close();
closeWriter();
} finally {
iwCommit.unlock();
}
log.info("closed " + this);
}
  IndexWriter writer对象的创建方法如下(SolrIndexWriter类为lucene的IndexWriter类的继承类)



protected SolrIndexWriter createMainIndexWriter(String name, boolean removeAllExisting) throws IOException {
return new SolrIndexWriter(name,core.getNewIndexDir(), core.getDirectoryFactory(), removeAllExisting, schema, core.getSolrConfig().indexConfig, core.getDeletionPolicy());
}
  ---------------------------------------------------------------------------
  本系列solr dataimport 数据导入源码分析系本人原创
  转载请注明出处 博客园 刺猬的温驯
  本文链接 http://www.iyunv.com/chenying99/archive/2013/05/04/3059443.html

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-87350-1-1.html 上篇帖子: SOLR安装与配置 下篇帖子: solr导入数据库数据
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表