|
去年本人写过一系列 solr dataimport 数据导入源码分析,由于博客园系统存在一些问题,后来上传的相同名称的图片覆盖了原来的图片,导致原来文章里面的图片与其内容不符合,我这里重新整理一下,也同时更新一些心得
solr dataimport 数据导入的全局UML类图如下(分两张,我的显示器不够大,显示器显示不完)
从上面的UML类图看以看出,这里面是采取了装饰模式以及迭代子模式等
我们首先来分析DataSource类,该类为抽象泛型类,定义了初始化数据源和获取泛型数据的方法
/**
*
* Provides data from a source with a given query.
*
*
*
* Implementation of this abstract class must provide a default no-arg constructor
*
*
*
* Refer to http://wiki.apache.org/solr/DataImportHandler
* for more details.
*
*
* This API is experimental and may change in the future.
*
* @version $Id: DataSource.java 684025 2008-08-08 17:50:11Z shalin $
* @since solr 1.3
*/
public abstract class DataSource {
/**
* Initializes the DataSource with the Context and
* initialization properties.
*
* This is invoked by the DataImporter after creating an
* instance of this class.
*
* @param context
* @param initProps
*/
public abstract void init(Context context, Properties initProps);
/**
* Get records for the given query.The return type depends on the
* implementation .
*
* @param query The query string. It can be a SQL for JdbcDataSource or a URL
* for HttpDataSource or a file location for FileDataSource or a custom
* format for your own custom DataSource.
* @return Depends on the implementation. For instance JdbcDataSource returns
* an Iterator
*/
public abstract T getData(String query);
/**
* Cleans up resources of this DataSource after use.
*/
public abstract void close();
}
该抽象类的注释很清楚,有不同的数据源继承类,包括针对数据库的JdbcDataSource数据源,针对URL的HttpDataSource数据源,针对本地文件的FileDataSource数据源,甚至还可以自定义数据源
我这里只分析数据库的JdbcDataSource数据源,其他数据源类似,比较容易理解了
针对数据库的JdbcDataSource数据源是获取Iterator数据迭代器
//数据库连接工厂
protected Callable factory;
private long connLastUsed = 0;
//当前数据库连接
private Connection conn;
//字段名与类型的映射
private Map fieldNameVsType = new HashMap();
//是否转换类型
private boolean convertType = false;
private int batchSize = FETCH_SIZE;
private int maxRows = 0;
在初始化方法里面初始化数据库连接工厂等
@Override
public void init(Context context, Properties initProps) {
Object o = initProps.get(CONVERT_TYPE);
if (o != null)
convertType = Boolean.parseBoolean(o.toString());
//数据库连接工厂
factory = createConnectionFactory(context, initProps);
//批次大小
String bsz = initProps.getProperty("batchSize");
if (bsz != null) {
bsz = context.replaceTokens(bsz);
try {
batchSize = Integer.parseInt(bsz);
if (batchSize == -1)
batchSize = Integer.MIN_VALUE;
} catch (NumberFormatException e) {
LOG.warn("Invalid batch size: " + bsz);
}
}
//初始化字段名与类型的映射
for (Map map : context.getAllEntityFields()) {
String n = map.get(DataImporter.COLUMN);
String t = map.get(DataImporter.TYPE);
if ("sint".equals(t) || "integer".equals(t))
fieldNameVsType.put(n, Types.INTEGER);
else if ("slong".equals(t) || "long".equals(t))
fieldNameVsType.put(n, Types.BIGINT);
else if ("float".equals(t) || "sfloat".equals(t))
fieldNameVsType.put(n, Types.FLOAT);
else if ("double".equals(t) || "sdouble".equals(t))
fieldNameVsType.put(n, Types.DOUBLE);
else if ("date".equals(t))
fieldNameVsType.put(n, Types.DATE);
else if ("boolean".equals(t))
fieldNameVsType.put(n, Types.BOOLEAN);
else if ("binary".equals(t))
fieldNameVsType.put(n, Types.BLOB);
else
fieldNameVsType.put(n, Types.VARCHAR);
}
}
数据库连接工厂创建方法如下(本人添加了部分注释)
protected Callable createConnectionFactory(final Context context,
final Properties initProps) {
// final VariableResolver resolver = context.getVariableResolver();
//根据模板替换字符
resolveVariables(context, initProps);
final String jndiName = initProps.getProperty(JNDI_NAME);
final String url = initProps.getProperty(URL);
final String driver = initProps.getProperty(DRIVER);
if (url == null && jndiName == null)
throw new DataImportHandlerException(SEVERE,
"JDBC URL or JNDI name has to be specified");
if (driver != null) {
try {
//注册驱动
DocBuilder.loadClass(driver, context.getSolrCore());
} catch (ClassNotFoundException e) {
wrapAndThrow(SEVERE, e, "Could not load driver: " + driver);
}
} else {
if(jndiName == null){
throw new DataImportHandlerException(SEVERE, "One of driver or jndiName must be specified in the data source");
}
}
String s = initProps.getProperty("maxRows");
if (s != null) {
maxRows = Integer.parseInt(s);
}
return factory = new Callable() {
public Connection call() throws Exception {
LOG.info("Creating a connection for entity "
+ context.getEntityAttribute(DataImporter.NAME) + " with URL: "
+ url);
long start = System.currentTimeMillis();
Connection c = null;
try {
if(url != null){//URL方式
c = DriverManager.getConnection(url, initProps);
} else if(jndiName != null){//JNDI方式
InitialContext ctx = new InitialContext();
Object jndival = ctx.lookup(jndiName);
if (jndival instanceof javax.sql.DataSource) {
javax.sql.DataSource dataSource = (javax.sql.DataSource) jndival;
String user = (String) initProps.get("user");
String pass = (String) initProps.get("password");
if(user == null || user.trim().equals("")){
c = dataSource.getConnection();
} else {
c = dataSource.getConnection(user, pass);
}
} else {
throw new DataImportHandlerException(SEVERE,
"the jndi name : '"+jndiName +"' is not a valid javax.sql.DataSource");
}
}
} catch (SQLException e) {
// DriverManager does not allow you to use a driver which is not loaded through
// the class loader of the class which is trying to make the connection.
// This is a workaround for cases where the user puts the driver jar in the
// solr.home/lib or solr.home/core/lib directories.
Driver d = (Driver) DocBuilder.loadClass(driver, context.getSolrCore()).newInstance();
c = d.connect(url, initProps);
}
if (c != null) {
if (Boolean.parseBoolean(initProps.getProperty("readOnly"))) {
c.setReadOnly(true);
// Add other sane defaults
c.setAutoCommit(true);
c.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
c.setHoldability(ResultSet.CLOSE_CURSORS_AT_COMMIT);
}
if (!Boolean.parseBoolean(initProps.getProperty("autoCommit"))) {
c.setAutoCommit(false);
}
String transactionIsolation = initProps.getProperty("transactionIsolation");
if ("TRANSACTION_READ_UNCOMMITTED".equals(transactionIsolation)) {
c.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
} else if ("TRANSACTION_READ_COMMITTED".equals(transactionIsolation)) {
c.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
} else if ("TRANSACTION_REPEATABLE_READ".equals(transactionIsolation)) {
c.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
} else if ("TRANSACTION_SERIALIZABLE".equals(transactionIsolation)) {
c.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
} else if ("TRANSACTION_NONE".equals(transactionIsolation)) {
c.setTransactionIsolation(Connection.TRANSACTION_NONE);
}
String holdability = initProps.getProperty("holdability");
if ("CLOSE_CURSORS_AT_COMMIT".equals(holdability)) {
c.setHoldability(ResultSet.CLOSE_CURSORS_AT_COMMIT);
} else if ("HOLD_CURSORS_OVER_COMMIT".equals(holdability)) {
c.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
}
LOG.info("Time taken for getConnection(): "
+ (System.currentTimeMillis() - start));
return c;
}
};
}
下面方法为获取Iterator数据迭代器
@Override
public Iterator getData(String query) {
ResultSetIterator r = new ResultSetIterator(query);
return r.getIterator();
}
ResultSetIterator为内部类,构造方法传入数据查询参数(数据库为SQL语句)
在ResultSetIterator内部类,根据数据库连接,完成对查询语句的执行,然后封装RecordSet数据集,返回Iterator数据迭代器
private class ResultSetIterator {
ResultSet resultSet;
Statement stmt = null;
List colNames;
Iterator rSetIterator;
public ResultSetIterator(String query) {
try {
Connection c = getConnection();
stmt = c.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(batchSize);
stmt.setMaxRows(maxRows);
LOG.debug("Executing SQL: " + query);
long start = System.currentTimeMillis();
if (stmt.execute(query)) {
resultSet = stmt.getResultSet();
}
LOG.trace("Time taken for sql :"
+ (System.currentTimeMillis() - start));
colNames = readFieldNames(resultSet.getMetaData());
} catch (Exception e) {
wrapAndThrow(SEVERE, e, "Unable to execute query: " + query);
}
if (resultSet == null) {
rSetIterator = new ArrayList().iterator();
return;
}
rSetIterator = new Iterator() {
public boolean hasNext() {
return hasnext();
}
public Map next() {
return getARow();
}
public void remove() {/* do nothing */
}
};
}
private Iterator getIterator() {
return rSetIterator;
}
private Map getARow() {
if (resultSet == null)
return null;
Map result = new HashMap();
for (String colName : colNames) {
try {
if (!convertType) {
// Use underlying database's type information
result.put(colName, resultSet.getObject(colName));
continue;
}
Integer type = fieldNameVsType.get(colName);
if (type == null)
type = Types.VARCHAR;
switch (type) {
case Types.INTEGER:
result.put(colName, resultSet.getInt(colName));
break;
case Types.FLOAT:
result.put(colName, resultSet.getFloat(colName));
break;
case Types.BIGINT:
result.put(colName, resultSet.getLong(colName));
break;
case Types.DOUBLE:
result.put(colName, resultSet.getDouble(colName));
break;
case Types.DATE:
result.put(colName, resultSet.getDate(colName));
break;
case Types.BOOLEAN:
result.put(colName, resultSet.getBoolean(colName));
break;
case Types.BLOB:
result.put(colName, resultSet.getBytes(colName));
break;
default:
result.put(colName, resultSet.getString(colName));
break;
}
} catch (SQLException e) {
logError("Error reading data ", e);
wrapAndThrow(SEVERE, e, "Error reading data from database");
}
}
return result;
}
private boolean hasnext() {
if (resultSet == null)
return false;
try {
if (resultSet.next()) {
return true;
} else {
close();
return false;
}
} catch (SQLException e) {
close();
wrapAndThrow(SEVERE,e);
return false;
}
}
private void close() {
try {
if (resultSet != null)
resultSet.close();
if (stmt != null)
stmt.close();
} catch (Exception e) {
logError("Exception while closing result set", e);
} finally {
resultSet = null;
stmt = null;
}
}
}
ResultSetIterator内部类是Iterator设计模式的体现
下列方法不多解释,是获取数据连接和关闭数据连接等
private Connection getConnection() throws Exception {
long currTime = System.currentTimeMillis();
if (currTime - connLastUsed > CONN_TIME_OUT) {
synchronized (this) {
Connection tmpConn = factory.call();
closeConnection();
connLastUsed = System.currentTimeMillis();
return conn = tmpConn;
}
} else {
connLastUsed = currTime;
return conn;
}
}
@Override
protected void finalize() throws Throwable {
try {
if(!isClosed){
LOG.error("JdbcDataSource was not closed prior to finalize(), indicates a bug -- POSSIBLE RESOURCE LEAK!!!");
close();
}
} finally {
super.finalize();
}
}
private boolean isClosed = false;
@Override
public void close() {
try {
closeConnection();
} finally {
isClosed = true;
}
}
private void closeConnection() {
try {
if (conn != null) {
conn.close();
}
} catch (Exception e) {
LOG.error("Ignoring Error when closing connection", e);
}
}
最后,在DataImporter类中完成JdbcDataSource数据源的实例化
DataSource getDataSourceInstance(DataConfig.Entity key, String name, Context ctx) {
Properties p = dataSourceProps.get(name);
if (p == null)
p = config.dataSources.get(name);
if (p == null)
p = dataSourceProps.get(null);// for default data source
if (p == null)
p = config.dataSources.get(null);
if (p == null)
throw new DataImportHandlerException(SEVERE,
"No dataSource :" + name + " available for entity :"
+ key.name);
String type = p.getProperty(TYPE);
DataSource dataSrc = null;
if (type == null) {
dataSrc = new JdbcDataSource();
} else {
try {
dataSrc = (DataSource) DocBuilder.loadClass(type, getCore()).newInstance();
} catch (Exception e) {
wrapAndThrow(SEVERE, e, "Invalid type for data source: " + type);
}
}
try {
Properties copyProps = new Properties();
copyProps.putAll(p);
Map map = ctx.getRequestParameters();
if (map.containsKey("rows")) {
int rows = Integer.parseInt((String) map.get("rows"));
if (map.containsKey("start")) {
rows += Integer.parseInt((String) map.get("start"));
}
copyProps.setProperty("maxRows", String.valueOf(rows));
}
//初始化
dataSrc.init(ctx, copyProps);
} catch (Exception e) {
wrapAndThrow(SEVERE, e, "Failed to initialize DataSource: " + key.dataSource);
}
return dataSrc;
}
---------------------------------------------------------------------------
本系列solr dataimport 数据导入源码分析系本人原创
转载请注明出处 博客园 刺猬的温驯
本文链接 http://www.iyunv.com/chenying99/archive/2013/05/04/3059295.html |
|