深入理解 Tomcat JDBC Pool

深入理解 Tomcat JDBC Pool

本文分析的源码版本基于 11.0.0-M3

创建连接池

Tomcat JDBC Pool 连接池可以配置多种多样的属性,相关特性以接口的方式封装在了 PoolConfiguration 中。

// 创建连接池的代码
// DataSourceProxy.java
private synchronized ConnectionPool pCreatePool() throws SQLException {
    pool = new ConnectionPool(poolProperties);
}

创建连接池的相关代码就位于 ConnectionPool 中,其在构造器中做了如下事情:

  • 检查相关配置。通常对一些 maxActiveminIdle 之间的大小逻辑进行校验等:
public void checkPoolConfiguration(PoolConfiguration properties) {
    //make sure the pool is properly configured
    if (properties.getMaxActive()<1) {
        properties.setMaxActive(PoolProperties.DEFAULT_MAX_ACTIVE);
    }
    if (properties.getMaxActive()<properties.getInitialSize()) {
        properties.setInitialSize(properties.getMaxActive());
    }
    if (properties.getMinIdle()>properties.getMaxActive()) {
        properties.setMinIdle(properties.getMaxActive());
    }
    if (properties.getMaxIdle()>properties.getMaxActive()) {
        properties.setMaxIdle(properties.getMaxActive());
    }
    if (properties.getMaxIdle()<properties.getMinIdle()) {
        properties.setMaxIdle(properties.getMinIdle());
    }
    if (properties.getMaxAge()>0 && properties.isPoolSweeperEnabled() &&
            properties.getTimeBetweenEvictionRunsMillis()>properties.getMaxAge()) {
        properties.setTimeBetweenEvictionRunsMillis((int)properties.getMaxAge());
    }
}
  • 创建资源池。资源池负责管理正在使用的连接、空闲的连接等:
//make space for 10 extra in case we flow over a bit
busy = new LinkedBlockingQueue<>();
//busy = new FairBlockingQueue<PooledConnection>();
//make space for 10 extra in case we flow over a bit
if (properties.isFairQueue()) {
    idle = new FairBlockingQueue<>();
    //idle = new MultiLockFairBlockingQueue<PooledConnection>();
    //idle = new LinkedTransferQueue<PooledConnection>();
    //idle = new ArrayBlockingQueue<PooledConnection>(properties.getMaxActive(),false);
} else {
    idle = new LinkedBlockingQueue<>();
}
  • 创建资源池清理器。资源池清理器会启动后台线程,来检测连接的有效性,并即时地释放不再使用的连接等:
//if the evictor thread is supposed to run, start it now
if (properties.isPoolSweeperEnabled()) {
    poolCleaner = new PoolCleaner(this, properties.getTimeBetweenEvictionRunsMillis());
    poolCleaner.start();
}
  • 事件拦截器。Tomcat JDBC 连接池允许用户自定义拦截器,以便接受连接池发生的一系列的事件等通知。
  • 初始化连接。连接池创建的时候,会根据用户配置的属性来默认创建 initialSize = 10 个连接放到池中:
PooledConnection[] initialPool = new PooledConnection[poolProperties.getInitialSize()];

for (int i = 0; i < initialPool.length; i++) {
    initialPool[i] = this.borrowConnection(0, null, null); //don't wait, should be no contention
}

//return the members as idle to the pool
for (int i = 0; i < initialPool.length; i++) {
    if (initialPool[i] != null) {
        try {this.returnConnection(initialPool[i]);}catch(Exception x){/*NOOP*/}
    } //end if
}

连接池默认属性

PoolProperties 定义了连接池的默认属性,相关属性的默认值如下:

属性 含义 默认值
initialSize 初始连接数 10
maxActive 最大活跃连接数 100
maxIdle 最大空闲连接数(空闲连接池最多允许保留多少个资源) 100
minIdle 最小空闲连接数(空闲连接池最少保留多少个资源) 10
maxWait 当没有可用连接以及达到 maxActive 时,应该等待多少毫秒之后才抛出异常 30000
validationQueryTimeout 校验连接的超时时间,超过多少秒才认为这条连接不可用了(-1 意味着不进行这个校验) -1
validationInterval 多少毫秒执行一次检查 3000
testOnConnect 建立连接的时候是否执行一次校验 false
testOnBorrow 从池中获取连接的时候是否执行一次校验 false
testOnReturn 连接还给连接池后是否执行一次校验 false
testWhileIdle 连接未被使用的时候是否执行校验 false
timeBetweenEvictionRunsMillis 多少毫秒运行一次空闲连接检查 5000
minEvictableIdleTimeMillis 一条连接多少毫秒没有使用后才被认为是空闲连接 60000
removeAbandonedTimeout 一条连接多少毫秒才被人为是需要 abandoned 60
removeAbandoned 连接标记为 abandoned 之后是否应该移除 false
logAbandoned 是否打印 abandoned 连接的异常栈 false
fairQueue getConnection 这个调用是否应该以 FIFO 的顺序进行响应 true
abandonWhenPercentageFull 使用率达到多高才考虑移除 abandoned 连接 0
maxAge 连接空闲之后,超过多少毫秒才自动连接一次 0
commitOnReturn 连接还到连接池后,是否应该自动 commit 一次 false
rollbackOnReturn 连接还到连接池后,是否应该自动 rollback 一次 false

获取连接

先尝试从空闲队列获取一个连接:

PooledConnection con = idle.poll();

while (true) {
    if (con!=null) {
        //configure the connection and return it
        PooledConnection result = borrowConnection(now, con, username, password);
        borrowedCount.incrementAndGet();
        if (result!=null) {
            return result;
        }
    }

    // 直接创建连接
    if (size.get() < getPoolProperties().getMaxActive()) {
        //atomic duplicate check
        if (size.addAndGet(1) > getPoolProperties().getMaxActive()) {
            //if we got here, two threads passed through the first if
            size.decrementAndGet();
        } else {
            //create a connection, we're below the limit
            return createConnection(now, con, username, password);
        }
    }

    // 等待可用连接

}

获取到的这个空闲连接其实已经有可能处于不可用的状态了,所以还得根据配置对其进行一些有效性检查

con.lock();
if (con.isReleased()) {
    return null;
}

boolean forceReconnect = con.shouldForceReconnect(username, password) || con.isMaxAgeExpired();
if (!con.isDiscarded() && !con.isInitialized()) {
    //here it states that the connection not discarded, but the connection is null
    //don't attempt a connect here. It will be done during the reconnect.
    forceReconnect = true;
}

if (!forceReconnect) {
    if ((!con.isDiscarded()) && con.validate(PooledConnection.VALIDATE_BORROW)) {
        //set the timestamp
        con.setTimestamp(now);
        if (getPoolProperties().isLogAbandoned()) {
            //set the stack trace for this pool
            con.setStackTrace(getThreadDump());
        }
        if (!busy.offer(con)) {
            log.debug("Connection doesn't fit into busy array, connection will not be traceable.");
        }
        return con;
    }
}

// 逻辑走到这里,那么再重新进行一次连接

con.unlock();

由上述代码可以看出,从 idle 队列中取出的连接,只要满足如下四个条件就该连接是有效的,可以返回给客户端使用:

  • 连接没有被释放
  • 连接没有被丢弃
  • 连接 validate 可以通过
  • 连接可以放入到 busy 队列里

如果发现连接已经达到了最大的有效期或者发现还从未真正连接过,那么就需要重新连接:

con.reconnect();
con.validate(validationMode);
busy.offer(con);

validate 逻辑

当从 idle 取连接的时候,只有 testOnBorrow 属性设置为 true 才会执行校验,否则不会执行。校验逻辑如下:

  • 上次校验周期内不进行校验:
long now = System.currentTimeMillis();
if (validateAction!=VALIDATE_INIT &&
    poolProperties.getValidationInterval() > 0 &&
    (now - this.lastValidated) <
    poolProperties.getValidationInterval()) {
    return true;
}
  • 发送相关用于校验的 SQL 查询语句,能正常查询返回,没有报 SQLException 就认为校验通过:
query = poolProperties.getValidationQuery();
stmt = connection.createStatement();
int validationQueryTimeout = poolProperties.getValidationQueryTimeout();
if (validationQueryTimeout > 0) {
    stmt.setQueryTimeout(validationQueryTimeout);
}

stmt.execute(query);
stmt.close();
this.lastValidated = now;
return true;

isMaxAgeExpired 逻辑

连接是否已经过期的判断逻辑:

public boolean isMaxAgeExpired() {
    if (getPoolProperties().getMaxAge()>0 ) {
        return (System.currentTimeMillis() - getLastConnected()) > getPoolProperties().getMaxAge();
    } else {
        return false;
    }
}

reconnect 逻辑

重新连接逻辑首先将连接丢弃掉:

public void reconnect() throws SQLException {
    this.disconnect(false);
    this.connect();
}

private void disconnect(boolean finalize) {
    setDiscarded(true);
    if (connection != null) {
        connection.close();
    }
    connection = null;
    lastConnected = -1;
}

然后再通过数据库驱动执行一次真正的底层连接 connect

driver = (java.sql.Driver)
    ClassLoaderUtil.loadClass(
        poolProperties.getDriverClassName(),
        PooledConnection.class.getClassLoader(),
        Thread.currentThread().getContextClassLoader()
    ).getConstructor().newInstance();
connection = driver.connect(driverURL, properties);
this.discarded = false;
this.lastConnected = System.currentTimeMillis();

等待可用连接

尝试等待 maxWait 毫秒的时间,如果等待不到,那么抛出 PoolExhaustedException 异常:

//calculate wait time for this iteration
long maxWait = wait;
//if the passed in wait time is -1, means we should use the pool property value
if (wait==-1) {
    maxWait = (getPoolProperties().getMaxWait()<=0)?Long.MAX_VALUE:getPoolProperties().getMaxWait();
}

long timetowait = Math.max(0, maxWait - (System.currentTimeMillis() - now));
con = idle.poll(timetowait, TimeUnit.MILLISECONDS);

//we didn't get a connection, lets see if we timed out
if (con == null) {
    if ((System.currentTimeMillis() - now) >= maxWait) {
        throw new PoolExhaustedException("[" + Thread.currentThread().getName()+"] " +
            "Timeout: Pool empty. Unable to fetch a connection in " + (maxWait / 1000) +
            " seconds, none available[size:"+size.get() +"; busy:"+busy.size()+"; idle:"+idle.size()+"; lastwait:"+timetowait+"].");
    } else {
        //no timeout, lets try again
        continue;
    }
}

归还连接

归还连接其实就是从 busy 队列中移除,当不需要关闭连接的时候,那么会重新放入到 idle 空闲队列中:

if (busy.remove(con)) {
    if (!shouldClose(con,PooledConnection.VALIDATE_RETURN) && reconnectIfExpired(con)) {
        con.setTimestamp(System.currentTimeMillis());
        if (((idle.size()>=poolProperties.getMaxIdle()) && !poolProperties.isPoolSweeperEnabled()) || (!idle.offer(con))) {
            release(con);
        }
    } else {
        release(con);
    } //end if
} else {
    release(con);
}

判断连接归还的时候需要需要关闭,其主要看的还是 validate 函数,当 testOnReturn 属性打开的时候才会在归还的时候执行校验,校验不通过的时候,会直接释放掉连接:

if (action == PooledConnection.VALIDATE_RETURN &&
    poolProperties.isTestOnReturn()) {
    // validate 返回成功
}

另外一个看的点是在归还连接到 idle 队列之前,是否能够正常结束当前事务,还之前是否应该主动调用 rollback 或者 commit

protected boolean terminateTransaction(PooledConnection con) {
    try {
        if (Boolean.FALSE.equals(con.getPoolProperties().getDefaultAutoCommit())) {
            if (this.getPoolProperties().getRollbackOnReturn()) {
                boolean autocommit = con.getConnection().getAutoCommit();
                if (!autocommit) {
                    con.getConnection().rollback();
                }
            } else if (this.getPoolProperties().getCommitOnReturn()) {
                boolean autocommit = con.getConnection().getAutoCommit();
                if (!autocommit) {
                    con.getConnection().commit();
                }
            }
        }
        return true;
    } catch (SQLException x) {
        log.warn("Unable to terminate transaction, connection will be closed.",x);
        return false;
    }
}

释放连接

public boolean release() {
    disconnect(true);
}

连接池清理器

后台运行的连接池清理器主要是干了如下几件事情:

  • 淘汰 abandoned 连接
  • 缩容 idle 连接池
  • 探活 idle 连接
protected static class PoolCleaner extends TimerTask {

    // 默认 30 秒
    protected long sleepTime;

    @Override
    public void run() {
        try {
            if (pool.getPoolProperties().isRemoveAbandoned()
                    || pool.getPoolProperties().getSuspectTimeout() > 0) {
                pool.checkAbandoned();
            }
            if (pool.getPoolProperties().getMinIdle() < pool.idle
                    .size()) {
                pool.checkIdle();
            }
            if (pool.getPoolProperties().isTestWhileIdle()) {
                pool.testAllIdle(false);
            } else if (pool.getPoolProperties().getMaxAge() > 0) {
                pool.testAllIdle(true);
            }
        } catch (Exception x) {
            log.error("", x);
        }
    }

}

checkAbandoned 逻辑

主动丢弃 busy 队列中运行时间大于 abandonTimeout 时间的连接:

Iterator<PooledConnection> locked = busy.iterator();
int sto = getPoolProperties().getSuspectTimeout();
while (locked.hasNext()) {
    PooledConnection con = locked.next();
    //the con has been returned to the pool or released
    //ignore it
    if (idle.contains(con) || con.isReleased()) {
        continue;
    }

    long time = con.getTimestamp();
    long now = System.currentTimeMillis();
    if (shouldAbandon() && (now - time) > con.getAbandonTimeout()) {
        busy.remove(con);
        abandon(con);
        setToNull = true;
    } else if (sto > 0 && (now - time) > (sto * 1000L)) {
        suspect(con);
    }
}

其中 shouldAbandon() 函数如下,其意在阐明当当前正在使用的连接达到较高数量的时候,应该将其主动丢弃。较高数量是指 used / max >= abandonWhenPercentageFull

protected boolean shouldAbandon() {
    if (!poolProperties.isRemoveAbandoned()) {
        return false;
    }
    if (poolProperties.getAbandonWhenPercentageFull()==0) {
        return true;
    }
    float used = busy.size();
    float max  = poolProperties.getMaxActive();
    float perc = poolProperties.getAbandonWhenPercentageFull();
    return (used/max*100f)>=perc;
}

abandon 连接其实是打印该连接的异常信息栈,并 release 连接:

String trace = con.getStackTrace();
if (getPoolProperties().isLogAbandoned()) {
    log.warn("Connection has been abandoned " + con + ":" + trace);
}

release(con);

checkIdle 逻辑

只要当前 idle 连接池比最小 idle 数量大,那么就逐个释放/淘汰连接:

long now = System.currentTimeMillis();
Iterator<PooledConnection> unlocked = idle.iterator();
while ( (ignoreMinSize || (idle.size()>=getPoolProperties().getMinIdle())) && unlocked.hasNext()) {
    PooledConnection con = unlocked.next();
    con.lock();
    //the con been taken out, we can't clean it up
    if (busy.contains(con)) {
        continue;
    }

    long time = con.getTimestamp();
    if (shouldReleaseIdle(now, con, time)) {
        release(con);
        idle.remove(con);
    } else {
        //do nothing
    } //end if

    con.unlock();
} //while

其中 releaseTime 正是配置的 minEvictableIdleTimeMillis 属性:

protected boolean shouldReleaseIdle(long now, PooledConnection con, long time) {
    if (con.getConnectionVersion() < getPoolVersion()) {
        return true;
    } else {
        return (con.getReleaseTime()>0) && ((now - time) > con.getReleaseTime()) && (getSize()>getPoolProperties().getMinIdle());
    }
}

public long getReleaseTime() {
    return this.poolProperties.getMinEvictableIdleTimeMillis();
}

testAllIdle 逻辑

testAllIdle 负责对 idle 线程池检查是否已经过期,如果过期或者 validate 失败则从 idle 连接池中移除掉该连接。

Iterator<PooledConnection> unlocked = idle.iterator();
while (unlocked.hasNext()) {
    PooledConnection con = unlocked.next();
    con.lock();
    
    //the con been taken out, we can't clean it up
    if (busy.contains(con)) {
        continue;
    }

    boolean release;
    if (checkMaxAgeOnly) {
        release = !reconnectIfExpired(con);
    } else {
        release = !reconnectIfExpired(con) || !con.validate(PooledConnection.VALIDATE_IDLE);
    }
    if (release) {
        idle.remove(con);
        release(con);
    }

    con.unlock();
}

拦截器

另外 Tomcat JDBC Pool 内置了慢查询拦截器,开启后其会在内存中主动跟踪 1000 条耗时大于 1 秒以上的慢查询 SQL 语句:

public class SlowQueryReport extends AbstractQueryReport  {

    /**
     * Maximum number of queries we will be storing
     */
    protected int  maxQueries= 1000; //don't store more than this amount of queries

    /**
     * Flag to enable disable logging of slow queries
     */
    protected boolean logSlow = true;

    /**
     * Flag to enable disable logging of failed queries
     */
    protected boolean logFailed = false;

    /**
     * the queries that are used for this interceptor.
     */
    protected volatile ConcurrentHashMap<String,QueryStats> queries = null;

}

慢查询语句的统计是通过动态代理 Statement 实现的:

public abstract class AbstractQueryReport extends AbstractCreateStatementInterceptor {

    /**
     * The threshold in milliseconds. If the query is faster than this, we don't measure it
     */
    protected long threshold = 1000; //don't report queries less than this

    @Override
    public Object createStatement(Object proxy, Method method, Object[] args, Object statement, long time) {
        // ...
        result = constructor.newInstance(new Object[] { new StatementProxy(statement,sql) });
        // ...
    }

    protected class StatementProxy implements InvocationHandler {

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            long start = (process)?System.currentTimeMillis():0;
            
            //execute the query
            result =  method.invoke(delegate,args);

            //measure the time
            long delta = (process)?(System.currentTimeMillis()-start):Long.MIN_VALUE;

            //measure the time
            long delta = (process)?(System.currentTimeMillis()-start):Long.MIN_VALUE;
            
            //see if we meet the requirements to measure
            if (delta>threshold) {
                try {
                    //report the slow query
                    reportSlowQuery(query, args, name, start, delta);
                }catch (Exception t) {
                    if (log.isWarnEnabled()) {
                      log.warn("Unable to process slow query",t);
                    }
                }
            }
        }

    }

}