Druid 设计

Druid 设计

连接池

阿里巴巴 Druid 的连接池是如何实现的?

从连接池获取连接 (分析 DruidDataSource.java),外层 for 无限循环,尝试调用 getConnectionInternal 获取可用连接,如果超时那么就会重试

int notFullTimeoutRetryCnt = 0;
for (;;) {
    DruidPooledConnection poolableConnection;
    try {
        poolableConnection = getConnectionInternal(maxWaitMillis);
    } catch (GetConnectionTimeoutException ex) {
        if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
            notFullTimeoutRetryCnt++;
            if (LOG.isWarnEnabled()) {
                LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
            }
            continue;
        }
        throw ex;
    }
}

getConnectionInternal 内部获取连接的主要代码:

  • 第一步:加锁
try {
    lock.lockInterruptibly();
} catch (InterruptedException e) {
    connectErrorCountUpdater.incrementAndGet(this);
    throw new SQLException("interrupt", e);
}
  • 第二步:是否直接新建一个连接

活跃连接未达到最大活跃连接,当前无正在创建的连接:

if (createScheduler != null
        && poolingCount == 0
        && activeCount < maxActive
        && creatingCountUpdater.get(this) == 0
        && createScheduler instanceof ScheduledThreadPoolExecutor) {
    ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
    if (executor.getQueue().size() > 0) {
        createDirect = true;
        continue;
    }
}

直接创建连接的代码如下,创建一个连接 activeCount 就加 1,创建成功后会把最外层的 for 循环 break 掉:

for (boolean createDirect = false;;) {
    if (createDirect) {
        createStartNanosUpdater.set(this, System.nanoTime());
        if (creatingCountUpdater.compareAndSet(this, 0, 1)) {
            // 创建一个物理连接
            PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
            holder = new DruidConnectionHolder(this, pyConnInfo);
            holder.lastActiveTimeMillis = System.currentTimeMillis();

            creatingCountUpdater.decrementAndGet(this);
            directCreateCountUpdater.incrementAndGet(this);

            boolean discard = false;
            lock.lock();
            try {
                if (activeCount < maxActive) {
                    activeCount++;
                    holder.active = true;
                    if (activeCount > activePeak) {
                        activePeak = activeCount;
                        activePeakTime = System.currentTimeMillis();
                    }
                    break;
                } else {
                    discard = true;
                }
            } finally {
                lock.unlock();
            }

            if (discard) {
                JdbcUtils.close(pyConnInfo.getPhysicalConnection());
            }
        }
    }
}
  • 第三步:connections 作为连接池,获取 connections 的最后一个连接。
private volatile DruidConnectionHolder[] connections;

pollLast 方法体整个是包裹在一个 for 循环里:

private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
    for (;;) {
        // ...
    }
}

在方法体内部,如果 poolingCount 为 0,说明当前可用连接为 0,那么就创建线程:

if (poolingCount == 0) {
    emptySignal(); // send signal to CreateThread create connection
}

emptySignal() 的方法体如下:

protected ScheduledExecutorService                 createScheduler;
protected Condition                                empty;

// 信号量
if (createScheduler == null) {
    // 当前空了
    empty.signal();
    return;
}

if (createTaskCount >= maxCreateTaskCount) {
    return;
}

if (activeCount + poolingCount + createTaskCount >= maxActive) {
    return;
}

// 异步创建连接
submitCreateTask(false);

随后在 pollLast() 方法的下半部分的主要逻辑是:

// 不为空的条件,如果不满足
// 那么最多等待 estimate 纳秒
notEmpty.awaitNanos(estimate);

DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;

return last;

当上面的创建线程池的任务内部逻辑创建一个连接成功后,会 put 一个连接:

private boolean put(DruidConnectionHolder holder, long createTaskId) {
    try {
        connections[poolingCount] = holder;
        // 当前不为空了,发送信号给等待队列的第一个线程
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
}

AtomicXXFieldUpdater

我们在上面代码可以看到很多地方更新某个类里面的 int 或者 long 变量都是使用的 AtomicXXFieldUpdater:

final static AtomicIntegerFieldUpdater<DruidAbstractDataSource> directCreateCountUpdater     = AtomicIntegerFieldUpdater.newUpdater(DruidAbstractDataSource.class, "directCreateCount");
final static AtomicLongFieldUpdater<DruidAbstractDataSource>    createCountUpdater           = AtomicLongFieldUpdater.newUpdater(DruidAbstractDataSource.class, "createCount");

那么为什么不采用 AtomicInteger 之类的类来控制并发操作呢?首先它的使用有一些约束:

  • 字段必须是 volatile 类型的
  • 对于父类的字段,子类是不能直接操作的
  • 只能是实例变量,不能是类变量
  • 只能修改 int/long 类型的字段,不能修改其包装类型(Integer/Long

一般 AtomicXXFieldUpdater 会声明为 static final 类型,即占用内存很小。其可以使我们可以在不修改用户代码(调用方)的情况下,就能实现并发安全性。