晓山青青 发表于 2015-7-17 11:34:05

solr dataimport 数据导入源码分析(三)

  在介绍DocBuilder 类之前,我们先来解读数据导入对应实体处理器EntityProcessor,默认的实体处理器为SqlEntityProcessor
  EntityProcessor 为抽象类,具体方法由子类实现

  package org.apache.solr.handler.dataimport;


import java.util.Map;
public abstract class EntityProcessor {
public abstract void init(Context context);
public abstract Map nextRow();
public abstract Map nextModifiedRowKey();
public abstract Map nextDeletedRowKey();
public abstract Map nextModifiedParentRowKey();
public abstract void destroy();
public void postTransform(Map r) {
}
public void close() {
    //no-op
}
EntityProcessorBase继承类封装公用逻辑,其中比较重要的是getNext()方法,用于遍历数据迭代器,供子类调用

protected Iterator rowIterator;
protected DIHCacheSupport cacheSupport = null;   protected Map getNext() {
    if(cacheSupport==null) {
      try {
      if (rowIterator == null)
          return null;
      if (rowIterator.hasNext())
          return rowIterator.next();
      query = null;
      rowIterator = null;
      return null;
      } catch (Exception e) {
      SolrException.log(log, "getNext() failed for query '" + query + "'", e);
      query = null;
      rowIterator = null;
      wrapAndThrow(DataImportHandlerException.WARN, e);
      return null;
      }
    } else{
      return cacheSupport.getCacheData(context, query, rowIterator);
    }      
}
  真正的数据处理器为SqlEntityProcessor,简要代码如下

  package org.apache.solr.handler.dataimport;


import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class SqlEntityProcessor extends EntityProcessorBase {

protected DataSource dataSource;

@Override
@SuppressWarnings("unchecked")
public void init(Context context) {
    super.init(context);
    dataSource = context.getDataSource();
}

protected void initQuery(String q) {
    try {
      DataImporter.QUERY_COUNT.get().incrementAndGet();
      rowIterator = dataSource.getData(q);
      this.query = q;
    } catch (DataImportHandlerException e) {
      throw e;
    } catch (Exception e) {
      LOG.error( "The query failed '" + q + "'", e);
      throw new DataImportHandlerException(DataImportHandlerException.SEVERE, e);
    }
}

@Override
public Map nextRow() {   
    if (rowIterator == null) {
      String q = getQuery();
      initQuery(context.replaceTokens(q));
    }
    return getNext();
}

@Override
public Map nextModifiedRowKey() {
    if (rowIterator == null) {
      String deltaQuery = context.getEntityAttribute(DELTA_QUERY);
      if (deltaQuery == null)
      return null;
      initQuery(context.replaceTokens(deltaQuery));
    }
    return getNext();
}

@Override
public Map nextDeletedRowKey() {
    if (rowIterator == null) {
      String deletedPkQuery = context.getEntityAttribute(DEL_PK_QUERY);
      if (deletedPkQuery == null)
      return null;
      initQuery(context.replaceTokens(deletedPkQuery));
    }
    return getNext();
}

@Override
public Map nextModifiedParentRowKey() {
    if (rowIterator == null) {
      String parentDeltaQuery = context.getEntityAttribute(PARENT_DELTA_QUERY);
      if (parentDeltaQuery == null)
      return null;
      LOG.info("Running parentDeltaQuery for Entity: "
            + context.getEntityAttribute("name"));
      initQuery(context.replaceTokens(parentDeltaQuery));
    }
    return getNext();
}
}
页: [1]
查看完整版本: solr dataimport 数据导入源码分析(三)