深入理解 Tomcat JDBC Pool
本文分析的源码版本基于
11.0.0-M3
。
创建连接池
Tomcat JDBC Pool 连接池可以配置多种多样的属性,相关特性以接口的方式封装在了 PoolConfiguration
中。
// 创建连接池的代码
// DataSourceProxy.java
private synchronized ConnectionPool pCreatePool() throws SQLException {
pool = new ConnectionPool(poolProperties);
}
创建连接池的相关代码就位于 ConnectionPool
中,其在构造器中做了如下事情:
- 检查相关配置。通常对一些
maxActive
、minIdle
之间的大小逻辑进行校验等:
public void checkPoolConfiguration(PoolConfiguration properties) {
//make sure the pool is properly configured
if (properties.getMaxActive()<1) {
properties.setMaxActive(PoolProperties.DEFAULT_MAX_ACTIVE);
}
if (properties.getMaxActive()<properties.getInitialSize()) {
properties.setInitialSize(properties.getMaxActive());
}
if (properties.getMinIdle()>properties.getMaxActive()) {
properties.setMinIdle(properties.getMaxActive());
}
if (properties.getMaxIdle()>properties.getMaxActive()) {
properties.setMaxIdle(properties.getMaxActive());
}
if (properties.getMaxIdle()<properties.getMinIdle()) {
properties.setMaxIdle(properties.getMinIdle());
}
if (properties.getMaxAge()>0 && properties.isPoolSweeperEnabled() &&
properties.getTimeBetweenEvictionRunsMillis()>properties.getMaxAge()) {
properties.setTimeBetweenEvictionRunsMillis((int)properties.getMaxAge());
}
}
- 创建资源池。资源池负责管理正在使用的连接、空闲的连接等:
//make space for 10 extra in case we flow over a bit
busy = new LinkedBlockingQueue<>();
//busy = new FairBlockingQueue<PooledConnection>();
//make space for 10 extra in case we flow over a bit
if (properties.isFairQueue()) {
idle = new FairBlockingQueue<>();
//idle = new MultiLockFairBlockingQueue<PooledConnection>();
//idle = new LinkedTransferQueue<PooledConnection>();
//idle = new ArrayBlockingQueue<PooledConnection>(properties.getMaxActive(),false);
} else {
idle = new LinkedBlockingQueue<>();
}
- 创建资源池清理器。资源池清理器会启动后台线程,来检测连接的有效性,并即时地释放不再使用的连接等:
//if the evictor thread is supposed to run, start it now
if (properties.isPoolSweeperEnabled()) {
poolCleaner = new PoolCleaner(this, properties.getTimeBetweenEvictionRunsMillis());
poolCleaner.start();
}
- 事件拦截器。Tomcat JDBC 连接池允许用户自定义拦截器,以便接受连接池发生的一系列的事件等通知。
- 初始化连接。连接池创建的时候,会根据用户配置的属性来默认创建
initialSize = 10
个连接放到池中:
PooledConnection[] initialPool = new PooledConnection[poolProperties.getInitialSize()];
for (int i = 0; i < initialPool.length; i++) {
initialPool[i] = this.borrowConnection(0, null, null); //don't wait, should be no contention
}
//return the members as idle to the pool
for (int i = 0; i < initialPool.length; i++) {
if (initialPool[i] != null) {
try {this.returnConnection(initialPool[i]);}catch(Exception x){/*NOOP*/}
} //end if
}
连接池默认属性
PoolProperties
定义了连接池的默认属性,相关属性的默认值如下:
属性 | 含义 | 默认值 |
---|---|---|
initialSize |
初始连接数 | 10 |
maxActive |
最大活跃连接数 | 100 |
maxIdle |
最大空闲连接数(空闲连接池最多允许保留多少个资源) | 100 |
minIdle |
最小空闲连接数(空闲连接池最少保留多少个资源) | 10 |
maxWait |
当没有可用连接以及达到 maxActive 时,应该等待多少毫秒之后才抛出异常 |
30000 |
validationQueryTimeout |
校验连接的超时时间,超过多少秒才认为这条连接不可用了(-1 意味着不进行这个校验) |
-1 |
validationInterval |
多少毫秒执行一次检查 | 3000 |
testOnConnect |
建立连接的时候是否执行一次校验 | false |
testOnBorrow |
从池中获取连接的时候是否执行一次校验 | false |
testOnReturn |
连接还给连接池后是否执行一次校验 | false |
testWhileIdle |
连接未被使用的时候是否执行校验 | false |
timeBetweenEvictionRunsMillis |
多少毫秒运行一次空闲连接检查 | 5000 |
minEvictableIdleTimeMillis |
一条连接多少毫秒没有使用后才被认为是空闲连接 | 60000 |
removeAbandonedTimeout |
一条连接多少毫秒才被人为是需要 abandoned | 60 |
removeAbandoned |
连接标记为 abandoned 之后是否应该移除 |
false |
logAbandoned |
是否打印 abandoned 连接的异常栈 |
false |
fairQueue |
getConnection 这个调用是否应该以 FIFO 的顺序进行响应 |
true |
abandonWhenPercentageFull |
使用率达到多高才考虑移除 abandoned 连接 |
0 |
maxAge |
连接空闲之后,超过多少毫秒才自动连接一次 | 0 |
commitOnReturn |
连接还到连接池后,是否应该自动 commit 一次 |
false |
rollbackOnReturn |
连接还到连接池后,是否应该自动 rollback 一次 |
false |
获取连接
先尝试从空闲队列获取一个连接:
PooledConnection con = idle.poll();
while (true) {
if (con!=null) {
//configure the connection and return it
PooledConnection result = borrowConnection(now, con, username, password);
borrowedCount.incrementAndGet();
if (result!=null) {
return result;
}
}
// 直接创建连接
if (size.get() < getPoolProperties().getMaxActive()) {
//atomic duplicate check
if (size.addAndGet(1) > getPoolProperties().getMaxActive()) {
//if we got here, two threads passed through the first if
size.decrementAndGet();
} else {
//create a connection, we're below the limit
return createConnection(now, con, username, password);
}
}
// 等待可用连接
}
获取到的这个空闲连接其实已经有可能处于不可用的状态了,所以还得根据配置对其进行一些有效性检查:
con.lock();
if (con.isReleased()) {
return null;
}
boolean forceReconnect = con.shouldForceReconnect(username, password) || con.isMaxAgeExpired();
if (!con.isDiscarded() && !con.isInitialized()) {
//here it states that the connection not discarded, but the connection is null
//don't attempt a connect here. It will be done during the reconnect.
forceReconnect = true;
}
if (!forceReconnect) {
if ((!con.isDiscarded()) && con.validate(PooledConnection.VALIDATE_BORROW)) {
//set the timestamp
con.setTimestamp(now);
if (getPoolProperties().isLogAbandoned()) {
//set the stack trace for this pool
con.setStackTrace(getThreadDump());
}
if (!busy.offer(con)) {
log.debug("Connection doesn't fit into busy array, connection will not be traceable.");
}
return con;
}
}
// 逻辑走到这里,那么再重新进行一次连接
con.unlock();
由上述代码可以看出,从 idle
队列中取出的连接,只要满足如下四个条件就该连接是有效的,可以返回给客户端使用:
- 连接没有被释放
- 连接没有被丢弃
- 连接
validate
可以通过 - 连接可以放入到
busy
队列里
如果发现连接已经达到了最大的有效期或者发现还从未真正连接过,那么就需要重新连接:
con.reconnect();
con.validate(validationMode);
busy.offer(con);
validate
逻辑
当从 idle
取连接的时候,只有 testOnBorrow
属性设置为 true
才会执行校验,否则不会执行。校验逻辑如下:
- 上次校验周期内不进行校验:
long now = System.currentTimeMillis();
if (validateAction!=VALIDATE_INIT &&
poolProperties.getValidationInterval() > 0 &&
(now - this.lastValidated) <
poolProperties.getValidationInterval()) {
return true;
}
- 发送相关用于校验的 SQL 查询语句,能正常查询返回,没有报
SQLException
就认为校验通过:
query = poolProperties.getValidationQuery();
stmt = connection.createStatement();
int validationQueryTimeout = poolProperties.getValidationQueryTimeout();
if (validationQueryTimeout > 0) {
stmt.setQueryTimeout(validationQueryTimeout);
}
stmt.execute(query);
stmt.close();
this.lastValidated = now;
return true;
isMaxAgeExpired
逻辑
连接是否已经过期的判断逻辑:
public boolean isMaxAgeExpired() {
if (getPoolProperties().getMaxAge()>0 ) {
return (System.currentTimeMillis() - getLastConnected()) > getPoolProperties().getMaxAge();
} else {
return false;
}
}
reconnect
逻辑
重新连接逻辑首先将连接丢弃掉:
public void reconnect() throws SQLException {
this.disconnect(false);
this.connect();
}
private void disconnect(boolean finalize) {
setDiscarded(true);
if (connection != null) {
connection.close();
}
connection = null;
lastConnected = -1;
}
然后再通过数据库驱动执行一次真正的底层连接 connect
:
driver = (java.sql.Driver)
ClassLoaderUtil.loadClass(
poolProperties.getDriverClassName(),
PooledConnection.class.getClassLoader(),
Thread.currentThread().getContextClassLoader()
).getConstructor().newInstance();
connection = driver.connect(driverURL, properties);
this.discarded = false;
this.lastConnected = System.currentTimeMillis();
等待可用连接
尝试等待 maxWait
毫秒的时间,如果等待不到,那么抛出 PoolExhaustedException
异常:
//calculate wait time for this iteration
long maxWait = wait;
//if the passed in wait time is -1, means we should use the pool property value
if (wait==-1) {
maxWait = (getPoolProperties().getMaxWait()<=0)?Long.MAX_VALUE:getPoolProperties().getMaxWait();
}
long timetowait = Math.max(0, maxWait - (System.currentTimeMillis() - now));
con = idle.poll(timetowait, TimeUnit.MILLISECONDS);
//we didn't get a connection, lets see if we timed out
if (con == null) {
if ((System.currentTimeMillis() - now) >= maxWait) {
throw new PoolExhaustedException("[" + Thread.currentThread().getName()+"] " +
"Timeout: Pool empty. Unable to fetch a connection in " + (maxWait / 1000) +
" seconds, none available[size:"+size.get() +"; busy:"+busy.size()+"; idle:"+idle.size()+"; lastwait:"+timetowait+"].");
} else {
//no timeout, lets try again
continue;
}
}
归还连接
归还连接其实就是从 busy
队列中移除,当不需要关闭连接的时候,那么会重新放入到 idle
空闲队列中:
if (busy.remove(con)) {
if (!shouldClose(con,PooledConnection.VALIDATE_RETURN) && reconnectIfExpired(con)) {
con.setTimestamp(System.currentTimeMillis());
if (((idle.size()>=poolProperties.getMaxIdle()) && !poolProperties.isPoolSweeperEnabled()) || (!idle.offer(con))) {
release(con);
}
} else {
release(con);
} //end if
} else {
release(con);
}
判断连接归还的时候需要需要关闭,其主要看的还是 validate
函数,当 testOnReturn
属性打开的时候才会在归还的时候执行校验,校验不通过的时候,会直接释放掉连接:
if (action == PooledConnection.VALIDATE_RETURN &&
poolProperties.isTestOnReturn()) {
// validate 返回成功
}
另外一个看的点是在归还连接到 idle
队列之前,是否能够正常结束当前事务,还之前是否应该主动调用 rollback
或者 commit
:
protected boolean terminateTransaction(PooledConnection con) {
try {
if (Boolean.FALSE.equals(con.getPoolProperties().getDefaultAutoCommit())) {
if (this.getPoolProperties().getRollbackOnReturn()) {
boolean autocommit = con.getConnection().getAutoCommit();
if (!autocommit) {
con.getConnection().rollback();
}
} else if (this.getPoolProperties().getCommitOnReturn()) {
boolean autocommit = con.getConnection().getAutoCommit();
if (!autocommit) {
con.getConnection().commit();
}
}
}
return true;
} catch (SQLException x) {
log.warn("Unable to terminate transaction, connection will be closed.",x);
return false;
}
}
释放连接
public boolean release() {
disconnect(true);
}
连接池清理器
后台运行的连接池清理器主要是干了如下几件事情:
- 淘汰
abandoned
连接 - 缩容
idle
连接池 - 探活
idle
连接
protected static class PoolCleaner extends TimerTask {
// 默认 30 秒
protected long sleepTime;
@Override
public void run() {
try {
if (pool.getPoolProperties().isRemoveAbandoned()
|| pool.getPoolProperties().getSuspectTimeout() > 0) {
pool.checkAbandoned();
}
if (pool.getPoolProperties().getMinIdle() < pool.idle
.size()) {
pool.checkIdle();
}
if (pool.getPoolProperties().isTestWhileIdle()) {
pool.testAllIdle(false);
} else if (pool.getPoolProperties().getMaxAge() > 0) {
pool.testAllIdle(true);
}
} catch (Exception x) {
log.error("", x);
}
}
}
checkAbandoned
逻辑
主动丢弃 busy
队列中运行时间大于 abandonTimeout
时间的连接:
Iterator<PooledConnection> locked = busy.iterator();
int sto = getPoolProperties().getSuspectTimeout();
while (locked.hasNext()) {
PooledConnection con = locked.next();
//the con has been returned to the pool or released
//ignore it
if (idle.contains(con) || con.isReleased()) {
continue;
}
long time = con.getTimestamp();
long now = System.currentTimeMillis();
if (shouldAbandon() && (now - time) > con.getAbandonTimeout()) {
busy.remove(con);
abandon(con);
setToNull = true;
} else if (sto > 0 && (now - time) > (sto * 1000L)) {
suspect(con);
}
}
其中 shouldAbandon()
函数如下,其意在阐明当当前正在使用的连接达到较高数量的时候,应该将其主动丢弃。较高数量是指 used / max >= abandonWhenPercentageFull
:
protected boolean shouldAbandon() {
if (!poolProperties.isRemoveAbandoned()) {
return false;
}
if (poolProperties.getAbandonWhenPercentageFull()==0) {
return true;
}
float used = busy.size();
float max = poolProperties.getMaxActive();
float perc = poolProperties.getAbandonWhenPercentageFull();
return (used/max*100f)>=perc;
}
而 abandon
连接其实是打印该连接的异常信息栈,并 release
连接:
String trace = con.getStackTrace();
if (getPoolProperties().isLogAbandoned()) {
log.warn("Connection has been abandoned " + con + ":" + trace);
}
release(con);
checkIdle
逻辑
只要当前 idle
连接池比最小 idle
数量大,那么就逐个释放/淘汰连接:
long now = System.currentTimeMillis();
Iterator<PooledConnection> unlocked = idle.iterator();
while ( (ignoreMinSize || (idle.size()>=getPoolProperties().getMinIdle())) && unlocked.hasNext()) {
PooledConnection con = unlocked.next();
con.lock();
//the con been taken out, we can't clean it up
if (busy.contains(con)) {
continue;
}
long time = con.getTimestamp();
if (shouldReleaseIdle(now, con, time)) {
release(con);
idle.remove(con);
} else {
//do nothing
} //end if
con.unlock();
} //while
其中 releaseTime
正是配置的 minEvictableIdleTimeMillis
属性:
protected boolean shouldReleaseIdle(long now, PooledConnection con, long time) {
if (con.getConnectionVersion() < getPoolVersion()) {
return true;
} else {
return (con.getReleaseTime()>0) && ((now - time) > con.getReleaseTime()) && (getSize()>getPoolProperties().getMinIdle());
}
}
public long getReleaseTime() {
return this.poolProperties.getMinEvictableIdleTimeMillis();
}
testAllIdle
逻辑
testAllIdle
负责对 idle
线程池检查是否已经过期,如果过期或者 validate
失败则从 idle
连接池中移除掉该连接。
Iterator<PooledConnection> unlocked = idle.iterator();
while (unlocked.hasNext()) {
PooledConnection con = unlocked.next();
con.lock();
//the con been taken out, we can't clean it up
if (busy.contains(con)) {
continue;
}
boolean release;
if (checkMaxAgeOnly) {
release = !reconnectIfExpired(con);
} else {
release = !reconnectIfExpired(con) || !con.validate(PooledConnection.VALIDATE_IDLE);
}
if (release) {
idle.remove(con);
release(con);
}
con.unlock();
}
拦截器
另外 Tomcat JDBC Pool 内置了慢查询拦截器,开启后其会在内存中主动跟踪 1000 条耗时大于 1 秒以上的慢查询 SQL 语句:
public class SlowQueryReport extends AbstractQueryReport {
/**
* Maximum number of queries we will be storing
*/
protected int maxQueries= 1000; //don't store more than this amount of queries
/**
* Flag to enable disable logging of slow queries
*/
protected boolean logSlow = true;
/**
* Flag to enable disable logging of failed queries
*/
protected boolean logFailed = false;
/**
* the queries that are used for this interceptor.
*/
protected volatile ConcurrentHashMap<String,QueryStats> queries = null;
}
慢查询语句的统计是通过动态代理 Statement
实现的:
public abstract class AbstractQueryReport extends AbstractCreateStatementInterceptor {
/**
* The threshold in milliseconds. If the query is faster than this, we don't measure it
*/
protected long threshold = 1000; //don't report queries less than this
@Override
public Object createStatement(Object proxy, Method method, Object[] args, Object statement, long time) {
// ...
result = constructor.newInstance(new Object[] { new StatementProxy(statement,sql) });
// ...
}
protected class StatementProxy implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
long start = (process)?System.currentTimeMillis():0;
//execute the query
result = method.invoke(delegate,args);
//measure the time
long delta = (process)?(System.currentTimeMillis()-start):Long.MIN_VALUE;
//measure the time
long delta = (process)?(System.currentTimeMillis()-start):Long.MIN_VALUE;
//see if we meet the requirements to measure
if (delta>threshold) {
try {
//report the slow query
reportSlowQuery(query, args, name, start, delta);
}catch (Exception t) {
if (log.isWarnEnabled()) {
log.warn("Unable to process slow query",t);
}
}
}
}
}
}