设为首页 收藏本站
查看: 1391|回复: 0

[经验分享] solr dataimport 数据导入源码分析(十二)

[复制链接]

尚未签到

发表于 2015-7-17 07:25:59 | 显示全部楼层 |阅读模式
  去年本人写过一系列 solr dataimport 数据导入源码分析,由于博客园系统存在一些问题,后来上传的相同名称的图片覆盖了原来的图片,导致原来文章里面的图片与其内容不符合,我这里重新整理一下,也同时更新一些心得
  solr dataimport 数据导入的全局UML类图如下(分两张,我的显示器不够大,显示器显示不完)
DSC0000.png
DSC0001.png
  从上面的UML类图看以看出,这里面是采取了装饰模式以及迭代子模式等
  我们首先来分析DataSource类,该类为抽象泛型类,定义了初始化数据源和获取泛型数据的方法



/**
*
* Provides data from a source with a given query.
*
*
*
* Implementation of this abstract class must provide a default no-arg constructor
*
*
*
* Refer to http://wiki.apache.org/solr/DataImportHandler
* for more details.
*
*
* This API is experimental and may change in the future.
*
* @version $Id: DataSource.java 684025 2008-08-08 17:50:11Z shalin $
* @since solr 1.3
*/
public abstract class DataSource {
/**
* Initializes the DataSource with the Context and
* initialization properties.
*
* This is invoked by the DataImporter after creating an
* instance of this class.
*
* @param context
* @param initProps
*/
public abstract void init(Context context, Properties initProps);
/**
* Get records for the given query.The return type depends on the
* implementation .
*
* @param query The query string. It can be a SQL for JdbcDataSource or a URL
*              for HttpDataSource or a file location for FileDataSource or a custom
*              format for your own custom DataSource.
* @return Depends on the implementation. For instance JdbcDataSource returns
*         an Iterator
*/
public abstract T getData(String query);
/**
* Cleans up resources of this DataSource after use.
*/
public abstract void close();
}
  该抽象类的注释很清楚,有不同的数据源继承类,包括针对数据库的JdbcDataSource数据源,针对URL的HttpDataSource数据源,针对本地文件的FileDataSource数据源,甚至还可以自定义数据源
  我这里只分析数据库的JdbcDataSource数据源,其他数据源类似,比较容易理解了
  针对数据库的JdbcDataSource数据源是获取Iterator数据迭代器



//数据库连接工厂
protected Callable factory;
private long connLastUsed = 0;
//当前数据库连接
private Connection conn;
//字段名与类型的映射
private Map fieldNameVsType = new HashMap();
//是否转换类型
private boolean convertType = false;
private int batchSize = FETCH_SIZE;
private int maxRows = 0;
  在初始化方法里面初始化数据库连接工厂等



@Override
public void init(Context context, Properties initProps) {
Object o = initProps.get(CONVERT_TYPE);
if (o != null)
convertType = Boolean.parseBoolean(o.toString());
//数据库连接工厂
factory = createConnectionFactory(context, initProps);
//批次大小
String bsz = initProps.getProperty("batchSize");
if (bsz != null) {
bsz = context.replaceTokens(bsz);
try {
batchSize = Integer.parseInt(bsz);
if (batchSize == -1)
batchSize = Integer.MIN_VALUE;
} catch (NumberFormatException e) {
LOG.warn("Invalid batch size: " + bsz);
}
}
//初始化字段名与类型的映射
for (Map map : context.getAllEntityFields()) {
String n = map.get(DataImporter.COLUMN);
String t = map.get(DataImporter.TYPE);
if ("sint".equals(t) || "integer".equals(t))
fieldNameVsType.put(n, Types.INTEGER);
else if ("slong".equals(t) || "long".equals(t))
fieldNameVsType.put(n, Types.BIGINT);
else if ("float".equals(t) || "sfloat".equals(t))
fieldNameVsType.put(n, Types.FLOAT);
else if ("double".equals(t) || "sdouble".equals(t))
fieldNameVsType.put(n, Types.DOUBLE);
else if ("date".equals(t))
fieldNameVsType.put(n, Types.DATE);
else if ("boolean".equals(t))
fieldNameVsType.put(n, Types.BOOLEAN);
else if ("binary".equals(t))
fieldNameVsType.put(n, Types.BLOB);
else
fieldNameVsType.put(n, Types.VARCHAR);
}
}
  数据库连接工厂创建方法如下(本人添加了部分注释)



protected Callable createConnectionFactory(final Context context,
final Properties initProps) {
//    final VariableResolver resolver = context.getVariableResolver();
//根据模板替换字符
    resolveVariables(context, initProps);
final String jndiName = initProps.getProperty(JNDI_NAME);
final String url = initProps.getProperty(URL);
final String driver = initProps.getProperty(DRIVER);
if (url == null && jndiName == null)
throw new DataImportHandlerException(SEVERE,
"JDBC URL or JNDI name has to be specified");
if (driver != null) {
try {
//注册驱动
        DocBuilder.loadClass(driver, context.getSolrCore());
} catch (ClassNotFoundException e) {
wrapAndThrow(SEVERE, e, "Could not load driver: " + driver);
}
} else {
if(jndiName == null){
throw new DataImportHandlerException(SEVERE, "One of driver or jndiName must be specified in the data source");
}
}
String s = initProps.getProperty("maxRows");
if (s != null) {
maxRows = Integer.parseInt(s);
}
return factory = new Callable() {
public Connection call() throws Exception {
LOG.info("Creating a connection for entity "
+ context.getEntityAttribute(DataImporter.NAME) + " with URL: "
+ url);
long start = System.currentTimeMillis();
Connection c = null;
try {
if(url != null){//URL方式
c = DriverManager.getConnection(url, initProps);
} else if(jndiName != null){//JNDI方式
InitialContext ctx =  new InitialContext();
Object jndival =  ctx.lookup(jndiName);
if (jndival instanceof javax.sql.DataSource) {
javax.sql.DataSource dataSource = (javax.sql.DataSource) jndival;
String user = (String) initProps.get("user");
String pass = (String) initProps.get("password");
if(user == null || user.trim().equals("")){
c = dataSource.getConnection();
} else {
c = dataSource.getConnection(user, pass);
}
} else {
throw new DataImportHandlerException(SEVERE,
"the jndi name : '"+jndiName +"' is not a valid javax.sql.DataSource");
}
}
} catch (SQLException e) {
// DriverManager does not allow you to use a driver which is not loaded through
// the class loader of the class which is trying to make the connection.
// This is a workaround for cases where the user puts the driver jar in the
// solr.home/lib or solr.home/core/lib directories.
Driver d = (Driver) DocBuilder.loadClass(driver, context.getSolrCore()).newInstance();
c = d.connect(url, initProps);
}
if (c != null) {
if (Boolean.parseBoolean(initProps.getProperty("readOnly"))) {
c.setReadOnly(true);
// Add other sane defaults
c.setAutoCommit(true);
c.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
c.setHoldability(ResultSet.CLOSE_CURSORS_AT_COMMIT);
}
if (!Boolean.parseBoolean(initProps.getProperty("autoCommit"))) {
c.setAutoCommit(false);
}
String transactionIsolation = initProps.getProperty("transactionIsolation");
if ("TRANSACTION_READ_UNCOMMITTED".equals(transactionIsolation)) {
c.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
} else if ("TRANSACTION_READ_COMMITTED".equals(transactionIsolation)) {
c.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
} else if ("TRANSACTION_REPEATABLE_READ".equals(transactionIsolation)) {
c.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
} else if ("TRANSACTION_SERIALIZABLE".equals(transactionIsolation)) {
c.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
} else if ("TRANSACTION_NONE".equals(transactionIsolation)) {
c.setTransactionIsolation(Connection.TRANSACTION_NONE);
}
String holdability = initProps.getProperty("holdability");
if ("CLOSE_CURSORS_AT_COMMIT".equals(holdability)) {
c.setHoldability(ResultSet.CLOSE_CURSORS_AT_COMMIT);
} else if ("HOLD_CURSORS_OVER_COMMIT".equals(holdability)) {
c.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
}
LOG.info("Time taken for getConnection(): "
+ (System.currentTimeMillis() - start));
return c;
}
};
}
  下面方法为获取Iterator数据迭代器



@Override
public Iterator getData(String query) {
ResultSetIterator r = new ResultSetIterator(query);
return r.getIterator();
}
  ResultSetIterator为内部类,构造方法传入数据查询参数(数据库为SQL语句)
  在ResultSetIterator内部类,根据数据库连接,完成对查询语句的执行,然后封装RecordSet数据集,返回Iterator数据迭代器



private class ResultSetIterator {
ResultSet resultSet;
Statement stmt = null;
List colNames;
Iterator rSetIterator;
public ResultSetIterator(String query) {
try {
Connection c = getConnection();
stmt = c.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(batchSize);
stmt.setMaxRows(maxRows);
LOG.debug("Executing SQL: " + query);
long start = System.currentTimeMillis();
if (stmt.execute(query)) {
resultSet = stmt.getResultSet();
}
LOG.trace("Time taken for sql :"
+ (System.currentTimeMillis() - start));
colNames = readFieldNames(resultSet.getMetaData());
} catch (Exception e) {
wrapAndThrow(SEVERE, e, "Unable to execute query: " + query);
}
if (resultSet == null) {
rSetIterator = new ArrayList().iterator();
return;
}
rSetIterator = new Iterator() {
public boolean hasNext() {
return hasnext();
}
public Map next() {
return getARow();
}
public void remove() {/* do nothing */
}
};
}
private Iterator getIterator() {
return rSetIterator;
}
private Map getARow() {
if (resultSet == null)
return null;
Map result = new HashMap();
for (String colName : colNames) {
try {
if (!convertType) {
// Use underlying database's type information
            result.put(colName, resultSet.getObject(colName));
continue;
}
Integer type = fieldNameVsType.get(colName);
if (type == null)
type = Types.VARCHAR;
switch (type) {
case Types.INTEGER:
result.put(colName, resultSet.getInt(colName));
break;
case Types.FLOAT:
result.put(colName, resultSet.getFloat(colName));
break;
case Types.BIGINT:
result.put(colName, resultSet.getLong(colName));
break;
case Types.DOUBLE:
result.put(colName, resultSet.getDouble(colName));
break;
case Types.DATE:
result.put(colName, resultSet.getDate(colName));
break;
case Types.BOOLEAN:
result.put(colName, resultSet.getBoolean(colName));
break;
case Types.BLOB:
result.put(colName, resultSet.getBytes(colName));
break;
default:
result.put(colName, resultSet.getString(colName));
break;
}
} catch (SQLException e) {
logError("Error reading data ", e);
wrapAndThrow(SEVERE, e, "Error reading data from database");
}
}
return result;
}
private boolean hasnext() {
if (resultSet == null)
return false;
try {
if (resultSet.next()) {
return true;
} else {
close();
return false;
}
} catch (SQLException e) {
close();
wrapAndThrow(SEVERE,e);
return false;
}
}
private void close() {
try {
if (resultSet != null)
resultSet.close();
if (stmt != null)
stmt.close();
} catch (Exception e) {
logError("Exception while closing result set", e);
} finally {
resultSet = null;
stmt = null;
}
}
}
  ResultSetIterator内部类是Iterator设计模式的体现
  下列方法不多解释,是获取数据连接和关闭数据连接等



private Connection getConnection() throws Exception {
long currTime = System.currentTimeMillis();
if (currTime - connLastUsed > CONN_TIME_OUT) {
synchronized (this) {
Connection tmpConn = factory.call();
closeConnection();
connLastUsed = System.currentTimeMillis();
return conn = tmpConn;
}
} else {
connLastUsed = currTime;
return conn;
}
}
@Override
protected void finalize() throws Throwable {
try {
if(!isClosed){
LOG.error("JdbcDataSource was not closed prior to finalize(), indicates a bug -- POSSIBLE RESOURCE LEAK!!!");
close();
}
} finally {
super.finalize();
}
}
private boolean isClosed = false;
@Override
public void close() {
try {
closeConnection();
} finally {
isClosed = true;
}
}
private void closeConnection()  {
try {
if (conn != null) {
conn.close();
}
} catch (Exception e) {
LOG.error("Ignoring Error when closing connection", e);
}
}
  最后,在DataImporter类中完成JdbcDataSource数据源的实例化



DataSource getDataSourceInstance(DataConfig.Entity key, String name, Context ctx) {
Properties p = dataSourceProps.get(name);
if (p == null)
p = config.dataSources.get(name);
if (p == null)
p = dataSourceProps.get(null);// for default data source
if (p == null)
p = config.dataSources.get(null);
if (p == null)  
throw new DataImportHandlerException(SEVERE,
"No dataSource :" + name + " available for entity :"
+ key.name);
String type = p.getProperty(TYPE);
DataSource dataSrc = null;
if (type == null) {
dataSrc = new JdbcDataSource();
} else {
try {
dataSrc = (DataSource) DocBuilder.loadClass(type, getCore()).newInstance();
} catch (Exception e) {
wrapAndThrow(SEVERE, e, "Invalid type for data source: " + type);
}
}
try {
Properties copyProps = new Properties();
copyProps.putAll(p);
Map map = ctx.getRequestParameters();
if (map.containsKey("rows")) {
int rows = Integer.parseInt((String) map.get("rows"));
if (map.containsKey("start")) {
rows += Integer.parseInt((String) map.get("start"));
}
copyProps.setProperty("maxRows", String.valueOf(rows));
}
      //初始化
      dataSrc.init(ctx, copyProps);
} catch (Exception e) {
wrapAndThrow(SEVERE, e, "Failed to initialize DataSource: " + key.dataSource);
}
return dataSrc;
}
  ---------------------------------------------------------------------------
  本系列solr dataimport 数据导入源码分析系本人原创
  转载请注明出处 博客园 刺猬的温驯
  本文链接 http://www.iyunv.com/chenying99/archive/2013/05/04/3059295.html

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-87403-1-1.html 上篇帖子: Solr学习02:搭建Solr环境 下篇帖子: solr安装配置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表