设为首页 收藏本站
查看: 1380|回复: 0

[经验分享] solr dataimport 数据导入源码分析(十三)

[复制链接]

尚未签到

发表于 2015-7-17 07:18:04 | 显示全部楼层 |阅读模式
  本文接下来分析EntityProcessor相关类,我们可以称之为实体处理器,针对不同的数据源有不同的实体处理器,屏蔽了不同数据源的差异
  本文只介绍针对数据库数据源的实体处理器,其他实体处理器类似
  EntityProcessor类为抽象类,定义了获取数据源的Map类型数据的方法(针对添加 修改 删除的数据)



/**
*
* An instance of entity processor serves an entity. It is reused throughout the
* import process.
*
*
*
* Implementations of this abstract class must provide a public no-args constructor.
*
*
*
* Refer to http://wiki.apache.org/solr/DataImportHandler
* for more details.
*
*
* This API is experimental and may change in the future.
*
* @version $Id: EntityProcessor.java 824359 2009-10-12 14:31:54Z ehatcher $
* @since solr 1.3
*/
public abstract class EntityProcessor {
/**
* This method is called when it starts processing an entity. When it comes
* back to the entity it is called again. So it can reset anything at that point.
* For a rootmost entity this is called only once for an ingestion. For sub-entities , this
* is called multiple once for each row from its parent entity
*
* @param context The current context
*/
public abstract void init(Context context);
/**
* This method helps streaming the data for each row . The implementation
* would fetch as many rows as needed and gives one 'row' at a time. Only this
* method is used during a full import
*
* @return A 'row'.  The 'key' for the map is the column name and the 'value'
*         is the value of that column. If there are no more rows to be
*         returned, return 'null'
*/
public abstract Map nextRow();
/**
* This is used for delta-import. It gives the pks of the changed rows in this
* entity
*
* @return the pk vs value of all changed rows
*/
public abstract Map nextModifiedRowKey();
/**
* This is used during delta-import. It gives the primary keys of the rows
* that are deleted from this entity. If this entity is the root entity, solr
* document is deleted. If this is a sub-entity, the Solr document is
* considered as 'changed' and will be recreated
*
* @return the pk vs value of all changed rows
*/
public abstract Map nextDeletedRowKey();
/**
* This is used during delta-import. This gives the primary keys and their
* values of all the rows changed in a parent entity due to changes in this
* entity.
*
* @return the pk vs value of all changed rows in the parent entity
*/
public abstract Map nextModifiedParentRowKey();
/**
* Invoked for each parent-row after the last row for this entity is processed. If this is the root-most
* entity, it will be called only once in the import, at the very end.
*
*/
public abstract void destroy();
/**
* Invoked after the transformers are invoked. EntityProcessors can add, remove or modify values
* added by Transformers in this method.
*
* @param r The transformed row
* @since solr 1.4
*/
public void postTransform(Map r) {
}
/**
* Invoked when the Entity processor is destroyed towards the end of import.
*
* @since solr 1.4
*/
public void close() {
//no-op
  }
}
  继承类EntityProcessorBase是所有具体实体处理器的基类,定义了公用方法,其中最重要的是Map getNext(),从数据迭代器Iterator rowIterator获取Map类型数据记录(其中DIHCacheSupport cacheSupport对象用于缓存)



protected Map getNext() {
if(cacheSupport==null) {
try {
if (rowIterator == null)
return null;
if (rowIterator.hasNext())
return rowIterator.next();
query = null;
rowIterator = null;
return null;
} catch (Exception e) {
SolrException.log(log, "getNext() failed for query '" + query + "'", e);
query = null;
rowIterator = null;
wrapAndThrow(DataImportHandlerException.WARN, e);
return null;
}
} else  {
return cacheSupport.getCacheData(context, query, rowIterator);
}      
}
  SqlEntityProcessor类为数据库数据源的实体处理器



/**
*
* An {@link EntityProcessor} instance which provides support for reading from
* databases. It is used in conjunction with {@link JdbcDataSource}. This is the default
* {@link EntityProcessor} if none is specified explicitly in data-config.xml
*
*
*
* Refer to http://wiki.apache.org/solr/DataImportHandler
* for more details.
*
*
* This API is experimental and may change in the future.
*
* @version $Id: SqlEntityProcessor.java 1065312 2011-01-30 16:08:25Z rmuir $
* @since solr 1.3
*/
public class SqlEntityProcessor extends EntityProcessorBase {
private static final Logger LOG = LoggerFactory.getLogger(SqlEntityProcessor.class);
//数据源
protected DataSource dataSource;
//初始化数据源
  @Override
@SuppressWarnings("unchecked")
public void init(Context context) {
super.init(context);
dataSource = context.getDataSource();
}
//初始化数据迭代器(根据查询语句从数据源获取)
protected void initQuery(String q) {
try {
DataImporter.QUERY_COUNT.get().incrementAndGet();
rowIterator = dataSource.getData(q);
this.query = q;
} catch (DataImportHandlerException e) {
throw e;
} catch (Exception e) {
LOG.error( "The query failed '" + q + "'", e);
throw new DataImportHandlerException(DataImportHandlerException.SEVERE, e);
}
}
@Override
public Map nextRow() {   
if (rowIterator == null) {
String q = getQuery();
initQuery(context.replaceTokens(q));
}
return getNext();
}
@Override
public Map nextModifiedRowKey() {
if (rowIterator == null) {
String deltaQuery = context.getEntityAttribute(DELTA_QUERY);
if (deltaQuery == null)
return null;
initQuery(context.replaceTokens(deltaQuery));
}
return getNext();
}
@Override
public Map nextDeletedRowKey() {
if (rowIterator == null) {
String deletedPkQuery = context.getEntityAttribute(DEL_PK_QUERY);
if (deletedPkQuery == null)
return null;
initQuery(context.replaceTokens(deletedPkQuery));
}
return getNext();
}
@Override
public Map nextModifiedParentRowKey() {
if (rowIterator == null) {
String parentDeltaQuery = context.getEntityAttribute(PARENT_DELTA_QUERY);
if (parentDeltaQuery == null)
return null;
LOG.info("Running parentDeltaQuery for Entity: "
+ context.getEntityAttribute("name"));
initQuery(context.replaceTokens(parentDeltaQuery));
}
return getNext();
}
public String getQuery() {
String queryString = context.getEntityAttribute(QUERY);
if (Context.FULL_DUMP.equals(context.currentProcess())) {
return queryString;
}
if (Context.DELTA_DUMP.equals(context.currentProcess())) {
String deltaImportQuery = context.getEntityAttribute(DELTA_IMPORT_QUERY);
if(deltaImportQuery != null) return deltaImportQuery;
}
LOG.warn("'deltaImportQuery' attribute is not specified for entity : "+ entityName);
return getDeltaImportQuery(queryString);
}
public String getDeltaImportQuery(String queryString) {   
StringBuilder sb = new StringBuilder(queryString);
if (SELECT_WHERE_PATTERN.matcher(queryString).find()) {
sb.append(" and ");
} else {
sb.append(" where ");
}
boolean first = true;
String[] primaryKeys = context.getEntityAttribute("pk").split(",");
for (String primaryKey : primaryKeys) {
if (!first) {
sb.append(" and ");
}
first = false;
Object val = context.resolve("dataimporter.delta." + primaryKey);
if (val == null) {
Matcher m = DOT_PATTERN.matcher(primaryKey);
if (m.find()) {
val = context.resolve("dataimporter.delta." + m.group(1));
}
}
sb.append(primaryKey).append(" = ");
if (val instanceof Number) {
sb.append(val.toString());
} else {
sb.append("'").append(val.toString()).append("'");
}
}
return sb.toString();
}
private static Pattern SELECT_WHERE_PATTERN = Pattern.compile(
"^\\s*(select\\b.*?\\b)(where).*", Pattern.CASE_INSENSITIVE);
public static final String QUERY = "query";
public static final String DELTA_QUERY = "deltaQuery";
public static final String DELTA_IMPORT_QUERY = "deltaImportQuery";
public static final String PARENT_DELTA_QUERY = "parentDeltaQuery";
public static final String DEL_PK_QUERY = "deletedPkQuery";
public static final Pattern DOT_PATTERN = Pattern.compile(".*?\\.(.*)$");
}
  我们接下来分析EntityProcessorWrapper类,该类继承自抽象类EntityProcessor,用于装饰具体的实体处理器(装饰模式)
  其重要成员如下



//被装饰的实体处理器
EntityProcessor delegate;
private DocBuilder docBuilder;
String onError;
Context context;
protected VariableResolverImpl resolver;
String entityName;
protected List transformers;
protected List rowcache;
  在它的构造方法里面,初始化被装饰的成员对象



public EntityProcessorWrapper(EntityProcessor delegate, DocBuilder docBuilder) {
this.delegate = delegate;
this.docBuilder = docBuilder;
}
  初始化方法里面调用被装饰对象的初始化方法(获取数据源)



@Override
public void init(Context context) {
rowcache = null;
this.context = context;
resolver = (VariableResolverImpl) context.getVariableResolver();
//context has to be set correctly . keep the copy of the old one so that it can be restored in destroy
if (entityName == null) {
onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
if (onError == null) {
onError = ABORT;
}
entityName = context.getEntityAttribute(DataConfig.NAME);
}
delegate.init(context);
}
  其他相关方法均为调用被装饰的具体实体处理器的相应方法,另外添加了数据转换等功能,本文不再具体分析


  ---------------------------------------------------------------------------
  本系列solr dataimport 数据导入源码分析系本人原创
  转载请注明出处 博客园 刺猬的温驯
  本文链接 http://www.iyunv.com/chenying99/archive/2013/05/04/3059397.html

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-87397-1-1.html 上篇帖子: Solr初始化源码分析-Solr初始化与启动 下篇帖子: solr查询语法
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表