Tomcat JDBC pool源码部析
最近跟随Tomcat7.0开发了一个JDBC 连接池。 Svn: http://svn.apache.org/repos/asf/tomcat/trunk/modules/jdbc-poolhttp://tomcat.apache.org/tomcat-7.0-doc/jdbc-pool.html上大篇幅的介绍,包括基本的使用指南。本篇从源码的角度,分析该连接池的实现思路。应用使用JDBC连接也,主要关注以下三个方面:1.获取连接2.归还连接3.空闲连接关闭。一. 获取连接ConnectionPool提供本个接口用于获取连接:public Future<Connection> getConnectionAsync() throws SQLException {
try {
PooledConnection pc = borrowConnection(0, null, null);
if (pc!=null) {
return new ConnectionFuture(pc);
}
}catch (SQLException x) {
if (x.getMessage().indexOf("NoWait")<0) {
throw x;
}
}
/we can only retrieve a future if the underlying queue supports it.
if (idle instanceof FairBlockingQueue<?>) {
Future<PooledConnection> pcf = ((FairBlockingQueue<PooledConnection>)idle).pollAsync();
return new ConnectionFuture(pcf);
} else if (idle instanceof MultiLockFairBlockingQueue<?>) {
Future<PooledConnection> pcf = ((MultiLockFairBlockingQueue<PooledConnection>)idle).pollAsync();
return new ConnectionFuture(pcf);
} else {
throw new SQLException("Connection pool is misconfigured, doesn't support async retrieval. Set the 'fair' property to 'true'");
}
public Connection getConnection() throws SQLException {
//check out a connection
PooledConnection con = borrowConnection(-1,null,null);
return setupConnection(con);
}
public Connection getConnection(String username, String password) throws SQLException {
// check out a connection
PooledConnection con = borrowConnection(-1, username, password);
return setupConnection(con);
}
第一个方法:getConnectionAsync用于获取一个连接的Feature.它用于支持以异步的方式获取连接。后两个方法不同之处就是传递了所需连接的用户名与密码。我们这里得点分析第三个方法.
PooledConnection con = borrowConnection(-1, username, password);
borrowConnection方法从空闲队列中获取一个连接,或新建一个连接。看一下源码:
/**
* Thread safe way to retrieve a connection from the pool
* @param wait - time to wait, overrides the maxWait from the properties,
* set to -1 if you wish to use maxWait, 0 if you wish no wait time.
* @return PooledConnection
* @throws SQLException
*/
private PooledConnection borrowConnection(int wait, String username, String password) throws SQLException {
//如果连接被关闭则直接抛出异常
if (isClosed()) {
throw new SQLException("Connection pool closed.");
} //end if
//get the current time stamp
long now = System.currentTimeMillis();
//see if there is one available immediately
/*从空闲队列中获取一个连接。 其实idle里存在连接对象有的可能并没有
绑定物理连接。这也是Tomcat jdbc pool的一个特别,连接在将要被使用时,
才会初始化*/
PooledConnection con = idle.poll();
while (true) {
if (con!=null) {
//configure the connection and return it
/*这里又出现一个borrowConnection的重载方法。该方法对从空闲队列中取到的连接对象进行配置和验证,稍后评述*/
PooledConnection result = borrowConnection(now, con, username, password);
//null should never be returned, but was in a previous impl.
// null should never be returned这句注释不对,根据后面的代码
// 来看,null是有可能发生。
if (result!=null) return result;
}
//if we get here, see if we need to create one
//this is not 100% accurate since it doesn't use a shared
//atomic variable - a connection can become idle while we are creating
//a new connection
/*从上面的英文注释我们很明白,当执行到这里时,唯一的可能是idle队列没能取到连接对象。
如果条件允许,我们将创建新的连接.在这里作者用了一个特别的算法,也是tomcat代码中常用的,
我们姑且称他占位法(我一般这么叫)。这个算法的特点就是先在计数器Size中占一个位置
(Size是原子变量。能够解决并发问题)。即size+1.然后检查size有没有超标。如果超标
则减去刚才新加的1。否则创建一个新的连接。不过这里我注意的是,如果创建新连接时失败,
size也必须减1。其实与大学时的用书抢位子异曲同工。*/
if (size.get() < getPoolProperties().getMaxActive()) {
//atomic duplicate check
if (size.addAndGet(1) > getPoolProperties().getMaxActive()) {
//if we got here, two threads passed through the first if
size.decrementAndGet();
} else {
//create a connection, we're below the limit
//后面再描述这个方法。
return createConnection(now, con, username, password);
}
} //end if
//到这里则表示连接池已满,不能创建新的连接,我们只能等待其他线程释放的连接
//calculate wait time for this iteration
long maxWait = wait;
//if the passed in wait time is -1,
//means we should use the pool property value
if (wait==-1) {
maxWait = (getPoolProperties().getMaxWait()<=0)?Long.MAX_VALUE:getPoolProperties().getMaxWait();
}
//我们需要计算本次最多能容忍的等待。为什么要计算呢。因为我们可能中间被假//唤醒但却没能拿到连接。
long timetowait = Math.max(0, maxWait - (System.currentTimeMillis() - now));
waitcount.incrementAndGet();
try {
//retrieve an existing connection
con = idle.poll(timetowait, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
if (getPoolProperties().getPropagateInterruptState()) {
Thread.currentThread().interrupt();
} else {
Thread.interrupted();
}
SQLException sx = new SQLException("Pool wait interrupted.");
sx.initCause(ex);
throw sx;
} finally {
waitcount.decrementAndGet();
}
//no wait, return one if we have one
if (maxWait==0 && con == null) {
throw new PoolExhaustedException("[" + Thread.currentThread().getName()+"] " +
"NoWait: Pool empty. Unable to fetch a connection, none available["+busy.size()+" in use].");
}
//we didn't get a connection, let’s see if we timed out
if (con == null) {
…
if ((System.currentTimeMillis() - now) >= maxWait) {
throw new PoolExhaustedException("[" + Thread.currentThread().getName()+"] " +
"Timeout: Pool empty. Unable to fetch a connection in " + (maxWait / 1000) +
" seconds, none available.");
} else {
//no timeout, lets try again
//如果没有超时,我们继续去获取连接。
continue;
}
}
} //while
}
waitTime表示连接请求者容忍等待的最大时间,超时没有获取到连接则抛出PoolExhaustedException异常。OK。
下面我们看中间遇到的borrowConnection的重载方法:
protected PooledConnection borrowConnection(long now, PooledConnection con, String username, String password)
和
protected PooledConnection createConnection(long now, PooledConnection notUsed, String username, String password)
首先看第一个:
/**
* Validates and configures a previously idle connection
* @param now - timestamp
* @param con - the connection to validate and configure
* @return con
* @throws SQLException if a validation error happens
*/
protected PooledConnection borrowConnection(long now, PooledConnection con, String username, String password) throws SQLException {
//we have a connection, lets set it up
//flag to see if we need to nullify
boolean setToNull = false;
try {
//为当前连接加锁
con.lock();
//验证当前连接用用户名与密码是否符合需求
boolean usercheck = con.checkUser(username, password);
if (con.isReleased()) {
return null;
}
//对于没标记为丢弃的连接且没有初始化的连接进行初始化。
if (!con.isDiscarded() && !con.isInitialized()) {
//attempt to connect
try {
con.connect();
} catch (Exception x) {
release(con);
setToNull = true;
if (x instanceof SQLException) {
throw (SQLException)x;
} else {
SQLException ex= new SQLException(x.getMessage());
ex.initCause(x);
throw ex;
}
}
}
if (usercheck) {
if ((!con.isDiscarded()) && con.validate(PooledConnection.VALIDATE_BORROW)) {
//set the timestamp
con.setTimestamp(now);
//这里添加LogAbandoned的功能是为了在检测到连接泄露时,
//获取占用该连接的线程栈
if (getPoolProperties().isLogAbandoned()) {
//set the stack trace for this pool
con.setStackTrace(getThreadDump());
}
//放入busy队列。如果不成功,则该连接将无法被追踪。(这种情况不会出现)
if (!busy.offer(con)) {
log.debug("Connection doesn't fit into busy array, connection will not be traceable.");
}
return con;
}
}
//if we reached here, that means the connection
//is either has another principal, is discarded or validation failed.
//we will make one more attempt
//in order to guarantee that the thread that just acquired
//the connection shouldn't have to poll again.
//这里英语描述的很清楚了。如果连接的用户名不符,被丢弃或验证失败,
//我们可以重连该连接,以满足需求,而不是再去获取其他的。
try {
con.reconnect();
if (con.validate(PooledConnection.VALIDATE_INIT)) {
//set the timestamp
con.setTimestamp(now);
if (getPoolProperties().isLogAbandoned()) {
//set the stack trace for this pool
con.setStackTrace(getThreadDump());
}
if (!busy.offer(con)) {
log.debug("Connection doesn't fit into busy array, connection will not be traceable.");
}
return con;
} else {
//validation failed.
release(con);
setToNull = true;
throw new SQLException("Failed to validate a newly established connection.");
}
} catch (Exception x) {
release(con);
setToNull = true;
if (x instanceof SQLException) {
throw (SQLException)x;
} else {
SQLException ex= new SQLException(x.getMessage());
ex.initCause(x);
throw ex;
}
}
} finally {
con.unlock();
if (setToNull) {
con = null;
}
}
}
小手一抖,钱钱到手! 有事秘书干,没事干秘书! 长大了娶唐僧做老公,能玩就玩一玩,不能玩就把他吃掉。 不知该说些什么。。。。。。就是谢谢 你的丑和你的脸没有关系。。。。。。 如果回帖是一种美德,那我早就成为圣人了!
页:
[1]