Druid 设计
连接池
阿里巴巴 Druid 的连接池是如何实现的?
从连接池获取连接 (分析 DruidDataSource.java
),外层 for
无限循环,尝试调用 getConnectionInternal
获取可用连接,如果超时那么就会重试:
int notFullTimeoutRetryCnt = 0;
for (;;) {
DruidPooledConnection poolableConnection;
try {
poolableConnection = getConnectionInternal(maxWaitMillis);
} catch (GetConnectionTimeoutException ex) {
if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
notFullTimeoutRetryCnt++;
if (LOG.isWarnEnabled()) {
LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
}
continue;
}
throw ex;
}
}
getConnectionInternal
内部获取连接的主要代码:
- 第一步:加锁
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("interrupt", e);
}
- 第二步:是否直接新建一个连接
活跃连接未达到最大活跃连接,当前无正在创建的连接:
if (createScheduler != null
&& poolingCount == 0
&& activeCount < maxActive
&& creatingCountUpdater.get(this) == 0
&& createScheduler instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
if (executor.getQueue().size() > 0) {
createDirect = true;
continue;
}
}
直接创建连接的代码如下,创建一个连接 activeCount
就加 1,创建成功后会把最外层的 for
循环 break
掉:
for (boolean createDirect = false;;) {
if (createDirect) {
createStartNanosUpdater.set(this, System.nanoTime());
if (creatingCountUpdater.compareAndSet(this, 0, 1)) {
// 创建一个物理连接
PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
holder = new DruidConnectionHolder(this, pyConnInfo);
holder.lastActiveTimeMillis = System.currentTimeMillis();
creatingCountUpdater.decrementAndGet(this);
directCreateCountUpdater.incrementAndGet(this);
boolean discard = false;
lock.lock();
try {
if (activeCount < maxActive) {
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
break;
} else {
discard = true;
}
} finally {
lock.unlock();
}
if (discard) {
JdbcUtils.close(pyConnInfo.getPhysicalConnection());
}
}
}
}
- 第三步:
connections
作为连接池,获取connections
的最后一个连接。
private volatile DruidConnectionHolder[] connections;
pollLast
方法体整个是包裹在一个 for
循环里:
private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
for (;;) {
// ...
}
}
在方法体内部,如果 poolingCount
为 0,说明当前可用连接为 0,那么就创建线程:
if (poolingCount == 0) {
emptySignal(); // send signal to CreateThread create connection
}
emptySignal()
的方法体如下:
protected ScheduledExecutorService createScheduler;
protected Condition empty;
// 信号量
if (createScheduler == null) {
// 当前空了
empty.signal();
return;
}
if (createTaskCount >= maxCreateTaskCount) {
return;
}
if (activeCount + poolingCount + createTaskCount >= maxActive) {
return;
}
// 异步创建连接
submitCreateTask(false);
随后在 pollLast()
方法的下半部分的主要逻辑是:
// 不为空的条件,如果不满足
// 那么最多等待 estimate 纳秒
notEmpty.awaitNanos(estimate);
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
return last;
当上面的创建线程池的任务内部逻辑创建一个连接成功后,会 put
一个连接:
private boolean put(DruidConnectionHolder holder, long createTaskId) {
try {
connections[poolingCount] = holder;
// 当前不为空了,发送信号给等待队列的第一个线程
notEmpty.signal();
} finally {
lock.unlock();
}
}
AtomicXXFieldUpdater
我们在上面代码可以看到很多地方更新某个类里面的 int
或者 long
变量都是使用的 AtomicXXFieldUpdater
:
final static AtomicIntegerFieldUpdater<DruidAbstractDataSource> directCreateCountUpdater = AtomicIntegerFieldUpdater.newUpdater(DruidAbstractDataSource.class, "directCreateCount");
final static AtomicLongFieldUpdater<DruidAbstractDataSource> createCountUpdater = AtomicLongFieldUpdater.newUpdater(DruidAbstractDataSource.class, "createCount");
那么为什么不采用 AtomicInteger
之类的类来控制并发操作呢?首先它的使用有一些约束:
- 字段必须是
volatile
类型的 - 对于父类的字段,子类是不能直接操作的
- 只能是实例变量,不能是类变量
- 只能修改
int
/long
类型的字段,不能修改其包装类型(Integer
/Long
)
一般 AtomicXXFieldUpdater
会声明为 static final
类型,即占用内存很小。其可以使我们可以在不修改用户代码(调用方)的情况下,就能实现并发安全性。