solr dataimport 数据导入源码分析(十三)
本文接下来分析EntityProcessor相关类,我们可以称之为实体处理器,针对不同的数据源有不同的实体处理器,屏蔽了不同数据源的差异本文只介绍针对数据库数据源的实体处理器,其他实体处理器类似
EntityProcessor类为抽象类,定义了获取数据源的Map类型数据的方法(针对添加 修改 删除的数据)
/**
*
* An instance of entity processor serves an entity. It is reused throughout the
* import process.
*
*
*
* Implementations of this abstract class must provide a public no-args constructor.
*
*
*
* Refer to http://wiki.apache.org/solr/DataImportHandler
* for more details.
*
*
* This API is experimental and may change in the future.
*
* @version $Id: EntityProcessor.java 824359 2009-10-12 14:31:54Z ehatcher $
* @since solr 1.3
*/
public abstract class EntityProcessor {
/**
* This method is called when it starts processing an entity. When it comes
* back to the entity it is called again. So it can reset anything at that point.
* For a rootmost entity this is called only once for an ingestion. For sub-entities , this
* is called multiple once for each row from its parent entity
*
* @param context The current context
*/
public abstract void init(Context context);
/**
* This method helps streaming the data for each row . The implementation
* would fetch as many rows as needed and gives one 'row' at a time. Only this
* method is used during a full import
*
* @return A 'row'.The 'key' for the map is the column name and the 'value'
* is the value of that column. If there are no more rows to be
* returned, return 'null'
*/
public abstract Map nextRow();
/**
* This is used for delta-import. It gives the pks of the changed rows in this
* entity
*
* @return the pk vs value of all changed rows
*/
public abstract Map nextModifiedRowKey();
/**
* This is used during delta-import. It gives the primary keys of the rows
* that are deleted from this entity. If this entity is the root entity, solr
* document is deleted. If this is a sub-entity, the Solr document is
* considered as 'changed' and will be recreated
*
* @return the pk vs value of all changed rows
*/
public abstract Map nextDeletedRowKey();
/**
* This is used during delta-import. This gives the primary keys and their
* values of all the rows changed in a parent entity due to changes in this
* entity.
*
* @return the pk vs value of all changed rows in the parent entity
*/
public abstract Map nextModifiedParentRowKey();
/**
* Invoked for each parent-row after the last row for this entity is processed. If this is the root-most
* entity, it will be called only once in the import, at the very end.
*
*/
public abstract void destroy();
/**
* Invoked after the transformers are invoked. EntityProcessors can add, remove or modify values
* added by Transformers in this method.
*
* @param r The transformed row
* @since solr 1.4
*/
public void postTransform(Map r) {
}
/**
* Invoked when the Entity processor is destroyed towards the end of import.
*
* @since solr 1.4
*/
public void close() {
//no-op
}
}
继承类EntityProcessorBase是所有具体实体处理器的基类,定义了公用方法,其中最重要的是Map getNext(),从数据迭代器Iterator rowIterator获取Map类型数据记录(其中DIHCacheSupport cacheSupport对象用于缓存)
protected Map getNext() {
if(cacheSupport==null) {
try {
if (rowIterator == null)
return null;
if (rowIterator.hasNext())
return rowIterator.next();
query = null;
rowIterator = null;
return null;
} catch (Exception e) {
SolrException.log(log, "getNext() failed for query '" + query + "'", e);
query = null;
rowIterator = null;
wrapAndThrow(DataImportHandlerException.WARN, e);
return null;
}
} else{
return cacheSupport.getCacheData(context, query, rowIterator);
}
}
SqlEntityProcessor类为数据库数据源的实体处理器
/**
*
* An {@link EntityProcessor} instance which provides support for reading from
* databases. It is used in conjunction with {@link JdbcDataSource}. This is the default
* {@link EntityProcessor} if none is specified explicitly in data-config.xml
*
*
*
* Refer to http://wiki.apache.org/solr/DataImportHandler
* for more details.
*
*
* This API is experimental and may change in the future.
*
* @version $Id: SqlEntityProcessor.java 1065312 2011-01-30 16:08:25Z rmuir $
* @since solr 1.3
*/
public class SqlEntityProcessor extends EntityProcessorBase {
private static final Logger LOG = LoggerFactory.getLogger(SqlEntityProcessor.class);
//数据源
protected DataSource dataSource;
//初始化数据源
@Override
@SuppressWarnings("unchecked")
public void init(Context context) {
super.init(context);
dataSource = context.getDataSource();
}
//初始化数据迭代器(根据查询语句从数据源获取)
protected void initQuery(String q) {
try {
DataImporter.QUERY_COUNT.get().incrementAndGet();
rowIterator = dataSource.getData(q);
this.query = q;
} catch (DataImportHandlerException e) {
throw e;
} catch (Exception e) {
LOG.error( "The query failed '" + q + "'", e);
throw new DataImportHandlerException(DataImportHandlerException.SEVERE, e);
}
}
@Override
public Map nextRow() {
if (rowIterator == null) {
String q = getQuery();
initQuery(context.replaceTokens(q));
}
return getNext();
}
@Override
public Map nextModifiedRowKey() {
if (rowIterator == null) {
String deltaQuery = context.getEntityAttribute(DELTA_QUERY);
if (deltaQuery == null)
return null;
initQuery(context.replaceTokens(deltaQuery));
}
return getNext();
}
@Override
public Map nextDeletedRowKey() {
if (rowIterator == null) {
String deletedPkQuery = context.getEntityAttribute(DEL_PK_QUERY);
if (deletedPkQuery == null)
return null;
initQuery(context.replaceTokens(deletedPkQuery));
}
return getNext();
}
@Override
public Map nextModifiedParentRowKey() {
if (rowIterator == null) {
String parentDeltaQuery = context.getEntityAttribute(PARENT_DELTA_QUERY);
if (parentDeltaQuery == null)
return null;
LOG.info("Running parentDeltaQuery for Entity: "
+ context.getEntityAttribute("name"));
initQuery(context.replaceTokens(parentDeltaQuery));
}
return getNext();
}
public String getQuery() {
String queryString = context.getEntityAttribute(QUERY);
if (Context.FULL_DUMP.equals(context.currentProcess())) {
return queryString;
}
if (Context.DELTA_DUMP.equals(context.currentProcess())) {
String deltaImportQuery = context.getEntityAttribute(DELTA_IMPORT_QUERY);
if(deltaImportQuery != null) return deltaImportQuery;
}
LOG.warn("'deltaImportQuery' attribute is not specified for entity : "+ entityName);
return getDeltaImportQuery(queryString);
}
public String getDeltaImportQuery(String queryString) {
StringBuilder sb = new StringBuilder(queryString);
if (SELECT_WHERE_PATTERN.matcher(queryString).find()) {
sb.append(" and ");
} else {
sb.append(" where ");
}
boolean first = true;
String[] primaryKeys = context.getEntityAttribute("pk").split(",");
for (String primaryKey : primaryKeys) {
if (!first) {
sb.append(" and ");
}
first = false;
Object val = context.resolve("dataimporter.delta." + primaryKey);
if (val == null) {
Matcher m = DOT_PATTERN.matcher(primaryKey);
if (m.find()) {
val = context.resolve("dataimporter.delta." + m.group(1));
}
}
sb.append(primaryKey).append(" = ");
if (val instanceof Number) {
sb.append(val.toString());
} else {
sb.append("'").append(val.toString()).append("'");
}
}
return sb.toString();
}
private static Pattern SELECT_WHERE_PATTERN = Pattern.compile(
"^\\s*(select\\b.*?\\b)(where).*", Pattern.CASE_INSENSITIVE);
public static final String QUERY = "query";
public static final String DELTA_QUERY = "deltaQuery";
public static final String DELTA_IMPORT_QUERY = "deltaImportQuery";
public static final String PARENT_DELTA_QUERY = "parentDeltaQuery";
public static final String DEL_PK_QUERY = "deletedPkQuery";
public static final Pattern DOT_PATTERN = Pattern.compile(".*?\\.(.*)$");
}
我们接下来分析EntityProcessorWrapper类,该类继承自抽象类EntityProcessor,用于装饰具体的实体处理器(装饰模式)
其重要成员如下
//被装饰的实体处理器
EntityProcessor delegate;
private DocBuilder docBuilder;
String onError;
Context context;
protected VariableResolverImpl resolver;
String entityName;
protected List transformers;
protected List rowcache;
在它的构造方法里面,初始化被装饰的成员对象
public EntityProcessorWrapper(EntityProcessor delegate, DocBuilder docBuilder) {
this.delegate = delegate;
this.docBuilder = docBuilder;
}
初始化方法里面调用被装饰对象的初始化方法(获取数据源)
@Override
public void init(Context context) {
rowcache = null;
this.context = context;
resolver = (VariableResolverImpl) context.getVariableResolver();
//context has to be set correctly . keep the copy of the old one so that it can be restored in destroy
if (entityName == null) {
onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
if (onError == null) {
onError = ABORT;
}
entityName = context.getEntityAttribute(DataConfig.NAME);
}
delegate.init(context);
}
其他相关方法均为调用被装饰的具体实体处理器的相应方法,另外添加了数据转换等功能,本文不再具体分析
---------------------------------------------------------------------------
本系列solr dataimport 数据导入源码分析系本人原创
转载请注明出处 博客园 刺猬的温驯
本文链接 http://www.iyunv.com/chenying99/archive/2013/05/04/3059397.html
页:
[1]