baidus 发表于 2013-4-7 08:55:00

Tomcat JDBC pool源码部析

最近跟随Tomcat7.0开发了一个JDBC 连接池。 Svn: http://svn.apache.org/repos/asf/tomcat/trunk/modules/jdbc-poolhttp://tomcat.apache.org/tomcat-7.0-doc/jdbc-pool.html上大篇幅的介绍,包括基本的使用指南。本篇从源码的角度,分析该连接池的实现思路。应用使用JDBC连接也,主要关注以下三个方面:1.获取连接2.归还连接3.空闲连接关闭。一. 获取连接ConnectionPool提供本个接口用于获取连接:
public Future<Connection> getConnectionAsync() throws SQLException {

   try {

         PooledConnection pc = borrowConnection(0, null, null);

         if (pc!=null) {

             return new ConnectionFuture(pc);

         }

   }catch (SQLException x) {

         if (x.getMessage().indexOf("NoWait")<0) {

             throw x;

         }

   }

/we can only retrieve a future if the underlying queue supports it.

   if (idle instanceof FairBlockingQueue<?>) {

         Future<PooledConnection> pcf = ((FairBlockingQueue<PooledConnection>)idle).pollAsync();

         return new ConnectionFuture(pcf);

   } else if (idle instanceof MultiLockFairBlockingQueue<?>) {

             Future<PooledConnection> pcf = ((MultiLockFairBlockingQueue<PooledConnection>)idle).pollAsync();

             return new ConnectionFuture(pcf);

   } else {

         throw new SQLException("Connection pool is misconfigured, doesn't support async retrieval. Set the 'fair' property to 'true'");

   }



public Connection getConnection() throws SQLException {

   //check out a connection

   PooledConnection con = borrowConnection(-1,null,null);

   return setupConnection(con);

}

public Connection getConnection(String username, String password)    throws SQLException {

   // check out a connection

    PooledConnection con = borrowConnection(-1, username,      password);

   return setupConnection(con);

}
第一个方法:getConnectionAsync用于获取一个连接的Feature.它用于支持以异步的方式获取连接。后两个方法不同之处就是传递了所需连接的用户名与密码。我们这里得点分析第三个方法.
PooledConnection con = borrowConnection(-1, username, password);
borrowConnection方法从空闲队列中获取一个连接,或新建一个连接。看一下源码:
/**

* Thread safe way to retrieve a connection from the pool

* @param wait - time to wait, overrides the maxWait from the properties,

* set to -1 if you wish to use maxWait, 0 if you wish no wait time.

   * @return PooledConnection

   * @throws SQLException

   */

private PooledConnection borrowConnection(int wait, String username, String password) throws SQLException {

    //如果连接被关闭则直接抛出异常

      if (isClosed()) {

            throw new SQLException("Connection pool closed.");

      } //end if



      //get the current time stamp

      long now = System.currentTimeMillis();

      //see if there is one available immediately

       /*从空闲队列中获取一个连接。 其实idle里存在连接对象有的可能并没有

绑定物理连接。这也是Tomcat jdbc pool的一个特别,连接在将要被使用时,

才会初始化*/

PooledConnection con = idle.poll();

      while (true) {

            if (con!=null) {

                //configure the connection and return it

                /*这里又出现一个borrowConnection的重载方法。该方法对从空闲队列中取到的连接对象进行配置和验证,稍后评述*/

                PooledConnection result = borrowConnection(now, con, username, password);

                //null should never be returned, but was in a previous impl.

                // null should never be returned这句注释不对,根据后面的代码

                // 来看,null是有可能发生。

                if (result!=null) return result;

            }

            //if we get here, see if we need to create one

            //this is not 100% accurate since it doesn't use a shared

            //atomic variable - a connection can become idle while we are creating

            //a new connection

            /*从上面的英文注释我们很明白,当执行到这里时,唯一的可能是idle队列没能取到连接对象。
             如果条件允许,我们将创建新的连接.在这里作者用了一个特别的算法,也是tomcat代码中常用的,
             我们姑且称他占位法(我一般这么叫)。这个算法的特点就是先在计数器Size中占一个位置
             (Size是原子变量。能够解决并发问题)。即size+1.然后检查size有没有超标。如果超标
            则减去刚才新加的1。否则创建一个新的连接。不过这里我注意的是,如果创建新连接时失败,
               size也必须减1。其实与大学时的用书抢位子异曲同工。*/

            if (size.get() < getPoolProperties().getMaxActive()) {

                //atomic duplicate check

                if (size.addAndGet(1) > getPoolProperties().getMaxActive()) {

                  //if we got here, two threads passed through the first if

                  size.decrementAndGet();

                } else {

                  //create a connection, we're below the limit

                  //后面再描述这个方法。

                  return createConnection(now, con, username, password);

                }

            } //end if

            //到这里则表示连接池已满,不能创建新的连接,我们只能等待其他线程释放的连接

            //calculate wait time for this iteration

            long maxWait = wait;

         //if the passed in wait time is -1,

         //means we should use the pool property value

            if (wait==-1) {

                maxWait = (getPoolProperties().getMaxWait()<=0)?Long.MAX_VALUE:getPoolProperties().getMaxWait();

            }

         //我们需要计算本次最多能容忍的等待。为什么要计算呢。因为我们可能中间被假//唤醒但却没能拿到连接。

            long timetowait = Math.max(0, maxWait - (System.currentTimeMillis() - now));

            waitcount.incrementAndGet();

            try {

                //retrieve an existing connection

                con = idle.poll(timetowait, TimeUnit.MILLISECONDS);

            } catch (InterruptedException ex) {

                if (getPoolProperties().getPropagateInterruptState()) {

                  Thread.currentThread().interrupt();

                } else {

                  Thread.interrupted();

                }

                SQLException sx = new SQLException("Pool wait interrupted.");

                sx.initCause(ex);

                throw sx;

            } finally {

                waitcount.decrementAndGet();

            }

            //no wait, return one if we have one

            if (maxWait==0 && con == null) {

                throw new PoolExhaustedException("[" + Thread.currentThread().getName()+"] " +

                "NoWait: Pool empty. Unable to fetch a connection, none available["+busy.size()+" in use].");

            }

            //we didn't get a connection, let’s see if we timed out

            if (con == null) {

                …

            if ((System.currentTimeMillis() - now) >= maxWait) {

                  throw new PoolExhaustedException("[" + Thread.currentThread().getName()+"] " +

                        "Timeout: Pool empty. Unable to fetch a connection in " + (maxWait / 1000) +

                        " seconds, none available.");

                } else {

                  //no timeout, lets try again

                  //如果没有超时,我们继续去获取连接。

                  continue;

                }

            }

      } //while

    }
waitTime表示连接请求者容忍等待的最大时间,超时没有获取到连接则抛出PoolExhaustedException异常。OK。
下面我们看中间遇到的borrowConnection的重载方法:
protected PooledConnection borrowConnection(long now, PooledConnection con, String username, String password)

protected PooledConnection createConnection(long now, PooledConnection notUsed, String username, String password)
首先看第一个:
/**
   * Validates and configures a previously idle connection

   * @param now - timestamp

   * @param con - the connection to validate and configure

   * @return con

   * @throws SQLException if a validation error happens

   */

protected PooledConnection borrowConnection(long now, PooledConnection con, String username, String password) throws SQLException {

      //we have a connection, lets set it up

      //flag to see if we need to nullify

      boolean setToNull = false;

      try {

            //为当前连接加锁

            con.lock();

         //验证当前连接用用户名与密码是否符合需求

            boolean usercheck = con.checkUser(username, password);

            if (con.isReleased()) {

                return null;

            }

            //对于没标记为丢弃的连接且没有初始化的连接进行初始化。

            if (!con.isDiscarded() && !con.isInitialized()) {

                //attempt to connect

                try {

                  con.connect();

                } catch (Exception x) {

                  release(con);

                  setToNull = true;

                  if (x instanceof SQLException) {

                        throw (SQLException)x;

                  } else {

                        SQLException ex= new SQLException(x.getMessage());

                        ex.initCause(x);

                        throw ex;

                  }

                }

            }

            

            if (usercheck) {

                if ((!con.isDiscarded()) && con.validate(PooledConnection.VALIDATE_BORROW)) {

                  //set the timestamp

                  con.setTimestamp(now);

                  //这里添加LogAbandoned的功能是为了在检测到连接泄露时,

                  //获取占用该连接的线程栈

                  if (getPoolProperties().isLogAbandoned()) {

                        //set the stack trace for this pool

                        con.setStackTrace(getThreadDump());

                  }

                   //放入busy队列。如果不成功,则该连接将无法被追踪。(这种情况不会出现)

                  if (!busy.offer(con)) {

                        log.debug("Connection doesn't fit into busy array, connection will not be traceable.");

                  }

                  return con;

                }

            }

            //if we reached here, that means the connection

            //is either has another principal, is discarded or validation failed.

            //we will make one more attempt

            //in order to guarantee that the thread that just acquired

            //the connection shouldn't have to poll again.

            //这里英语描述的很清楚了。如果连接的用户名不符,被丢弃或验证失败,

            //我们可以重连该连接,以满足需求,而不是再去获取其他的。

            try {

                con.reconnect();

                if (con.validate(PooledConnection.VALIDATE_INIT)) {

                  //set the timestamp

                  con.setTimestamp(now);

                  if (getPoolProperties().isLogAbandoned()) {

                        //set the stack trace for this pool

                        con.setStackTrace(getThreadDump());

                  }

                  if (!busy.offer(con)) {

                        log.debug("Connection doesn't fit into busy array, connection will not be traceable.");

                  }

                  return con;

                } else {

                  //validation failed.

                  release(con);

                  setToNull = true;

                  throw new SQLException("Failed to validate a newly established connection.");

                }

            } catch (Exception x) {

                release(con);

                setToNull = true;

                if (x instanceof SQLException) {

                  throw (SQLException)x;

                } else {

                  SQLException ex= new SQLException(x.getMessage());

                  ex.initCause(x);

                  throw ex;

                }

            }

      } finally {

            con.unlock();

            if (setToNull) {

                con = null;

            }

      }

    }

水莹儿 发表于 2013-4-7 09:10:20

小手一抖,钱钱到手!

falldog 发表于 2013-5-16 20:09:20

有事秘书干,没事干秘书!

xuanhao 发表于 2013-5-18 01:47:19

长大了娶唐僧做老公,能玩就玩一玩,不能玩就把他吃掉。

QQ叫紫珊 发表于 2013-5-19 05:33:35

不知该说些什么。。。。。。就是谢谢

haloi 发表于 2013-5-20 15:34:15

你的丑和你的脸没有关系。。。。。。

olga 发表于 2013-5-22 01:36:40

如果回帖是一种美德,那我早就成为圣人了!
页: [1]
查看完整版本: Tomcat JDBC pool源码部析