[经验分享] solr dataimport 数据导入源码分析(十三)



  EntityProcessor类为抽象类,定义了获取数据源的Map类型数据的方法(针对添加 修改 删除的数据)

* An instance of entity processor serves an entity. It is reused throughout the
* import process.
* Implementations of this abstract class must provide a public no-args constructor.
* Refer to http://wiki.apache.org/solr/DataImportHandler
* for more details.
* This API is experimental and may change in the future.
* @version $Id: EntityProcessor.java 824359 2009-10-12 14:31:54Z ehatcher $
* @since solr 1.3
public abstract class EntityProcessor {
* This method is called when it starts processing an entity. When it comes
* back to the entity it is called again. So it can reset anything at that point.
* For a rootmost entity this is called only once for an ingestion. For sub-entities , this
* is called multiple once for each row from its parent entity
* @param context The current context
public abstract void init(Context context);
* This method helps streaming the data for each row . The implementation
* would fetch as many rows as needed and gives one 'row' at a time. Only this
* method is used during a full import
* @return A 'row'.  The 'key' for the map is the column name and the 'value'
*         is the value of that column. If there are no more rows to be
*         returned, return 'null'
public abstract Map nextRow();
* This is used for delta-import. It gives the pks of the changed rows in this
* entity
* @return the pk vs value of all changed rows
public abstract Map nextModifiedRowKey();
* This is used during delta-import. It gives the primary keys of the rows
* that are deleted from this entity. If this entity is the root entity, solr
* document is deleted. If this is a sub-entity, the Solr document is
* considered as 'changed' and will be recreated
* @return the pk vs value of all changed rows
public abstract Map nextDeletedRowKey();
* This is used during delta-import. This gives the primary keys and their
* values of all the rows changed in a parent entity due to changes in this
* entity.
* @return the pk vs value of all changed rows in the parent entity
public abstract Map nextModifiedParentRowKey();
* Invoked for each parent-row after the last row for this entity is processed. If this is the root-most
* entity, it will be called only once in the import, at the very end.
public abstract void destroy();
* Invoked after the transformers are invoked. EntityProcessors can add, remove or modify values
* added by Transformers in this method.
* @param r The transformed row
* @since solr 1.4
public void postTransform(Map r) {
* Invoked when the Entity processor is destroyed towards the end of import.
* @since solr 1.4
public void close() {
  继承类EntityProcessorBase是所有具体实体处理器的基类,定义了公用方法,其中最重要的是Map getNext(),从数据迭代器Iterator rowIterator获取Map类型数据记录(其中DIHCacheSupport cacheSupport对象用于缓存)

protected Map getNext() {
if(cacheSupport==null) {
try {
if (rowIterator == null)
return null;
if (rowIterator.hasNext())
return rowIterator.next();
query = null;
rowIterator = null;
return null;
} catch (Exception e) {
SolrException.log(log, "getNext() failed for query '" + query + "'", e);
query = null;
rowIterator = null;
wrapAndThrow(DataImportHandlerException.WARN, e);
return null;
} else  {
return cacheSupport.getCacheData(context, query, rowIterator);

* An {@link EntityProcessor} instance which provides support for reading from
* databases. It is used in conjunction with {@link JdbcDataSource}. This is the default
* {@link EntityProcessor} if none is specified explicitly in data-config.xml
* Refer to http://wiki.apache.org/solr/DataImportHandler
* for more details.
* This API is experimental and may change in the future.
* @version $Id: SqlEntityProcessor.java 1065312 2011-01-30 16:08:25Z rmuir $
* @since solr 1.3
public class SqlEntityProcessor extends EntityProcessorBase {
private static final Logger LOG = LoggerFactory.getLogger(SqlEntityProcessor.class);
protected DataSource dataSource;
public void init(Context context) {
dataSource = context.getDataSource();
protected void initQuery(String q) {
try {
rowIterator = dataSource.getData(q);
this.query = q;
} catch (DataImportHandlerException e) {
throw e;
} catch (Exception e) {
LOG.error( "The query failed '" + q + "'", e);
throw new DataImportHandlerException(DataImportHandlerException.SEVERE, e);
public Map nextRow() {   
if (rowIterator == null) {
String q = getQuery();
return getNext();
public Map nextModifiedRowKey() {
if (rowIterator == null) {
String deltaQuery = context.getEntityAttribute(DELTA_QUERY);
if (deltaQuery == null)
return null;
return getNext();
public Map nextDeletedRowKey() {
if (rowIterator == null) {
String deletedPkQuery = context.getEntityAttribute(DEL_PK_QUERY);
if (deletedPkQuery == null)
return null;
return getNext();
public Map nextModifiedParentRowKey() {
if (rowIterator == null) {
String parentDeltaQuery = context.getEntityAttribute(PARENT_DELTA_QUERY);
if (parentDeltaQuery == null)
return null;
LOG.info("Running parentDeltaQuery for Entity: "
+ context.getEntityAttribute("name"));
return getNext();
public String getQuery() {
String queryString = context.getEntityAttribute(QUERY);
if (Context.FULL_DUMP.equals(context.currentProcess())) {
return queryString;
if (Context.DELTA_DUMP.equals(context.currentProcess())) {
String deltaImportQuery = context.getEntityAttribute(DELTA_IMPORT_QUERY);
if(deltaImportQuery != null) return deltaImportQuery;
LOG.warn("'deltaImportQuery' attribute is not specified for entity : "+ entityName);
return getDeltaImportQuery(queryString);
public String getDeltaImportQuery(String queryString) {   
StringBuilder sb = new StringBuilder(queryString);
if (SELECT_WHERE_PATTERN.matcher(queryString).find()) {
sb.append(" and ");
} else {
sb.append(" where ");
boolean first = true;
String[] primaryKeys = context.getEntityAttribute("pk").split(",");
for (String primaryKey : primaryKeys) {
if (!first) {
sb.append(" and ");
first = false;
Object val = context.resolve("dataimporter.delta." + primaryKey);
if (val == null) {
Matcher m = DOT_PATTERN.matcher(primaryKey);
if (m.find()) {
val = context.resolve("dataimporter.delta." + m.group(1));
sb.append(primaryKey).append(" = ");
if (val instanceof Number) {
} else {
return sb.toString();
private static Pattern SELECT_WHERE_PATTERN = Pattern.compile(
"^\\s*(select\\b.*?\\b)(where).*", Pattern.CASE_INSENSITIVE);
public static final String QUERY = "query";
public static final String DELTA_QUERY = "deltaQuery";
public static final String DELTA_IMPORT_QUERY = "deltaImportQuery";
public static final String PARENT_DELTA_QUERY = "parentDeltaQuery";
public static final String DEL_PK_QUERY = "deletedPkQuery";
public static final Pattern DOT_PATTERN = Pattern.compile(".*?\\.(.*)$");

EntityProcessor delegate;
private DocBuilder docBuilder;
String onError;
Context context;
protected VariableResolverImpl resolver;
String entityName;
protected List transformers;
protected List rowcache;

public EntityProcessorWrapper(EntityProcessor delegate, DocBuilder docBuilder) {
this.delegate = delegate;
this.docBuilder = docBuilder;

public void init(Context context) {
rowcache = null;
this.context = context;
resolver = (VariableResolverImpl) context.getVariableResolver();
//context has to be set correctly . keep the copy of the old one so that it can be restored in destroy
if (entityName == null) {
onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
if (onError == null) {
onError = ABORT;
entityName = context.getEntityAttribute(DataConfig.NAME);

  本系列solr dataimport 数据导入源码分析系本人原创
  转载请注明出处 博客园 刺猬的温驯
  本文链接 http://www.iyunv.com/chenying99/archive/2013/05/04/3059397.html

本贴地址:https://www.yunweiku.com/thread-87397-1-1.html 上篇帖子: Solr初始化源码分析-Solr初始化与启动 下篇帖子: solr查询语法
