并发 - 线程池

线程池

好处

说明:线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

设计哲学

将任务的提交与执行解耦开,从而无须太大的困难就能为某种类型的任务指定和修改执行策略。

用法

任务无须返回值,调用这个方法:

public void execute(Runnable command) {}

需要返回值的任务,调用 submit

Future<Object> future = executor.submit(hasReturnValuetask);

try {
    Object s = future.get();
} catch (InterruptedException e) {
    // ...
} catch (ExecutionException e) {
    // ...
}

构造器参数

public ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler) {
    // ...
}

execute() 方法运行原理

(1) 如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务。部分代码片段如下:

if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}

addWorker 获取锁,创建线程,并运行任务,伪代码如下:

w = new Worker(firstTask);
final Thread t = w.thread;

if (t != null) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        workers.add(w);
    } finally {
        mainLock.unlock();
    }

    t.start();
}

其中 new Worker(firstTask) 的内部,创建了新的线程:

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

(2)如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue

workQueue.offer(command)

(3)如果无法将任务加入 BlockingQueue(队列已满),则再一次执行 addWorker 尝试创建新的线程:

if (isRunning(c) && workQueue.offer(command)) {
    // 没有命中这个 if 条件
} else if (!addWorker(command, false)) {
    // ...
}

注意,这一次传递给 addWorker 的第二个参数是 core = false,即要求创建的是非核心线程,而 addWorker 内部也会根据 core 的值来循环检查是否大于 corePoolSize 还是 maximumPoolSize

int wc = workerCountOf(c);
if (wc >= CAPACITY ||
    wc >= (core ? corePoolSize : maximumPoolSize))
    return false;

(4)如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution() 方法。

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

线程池大小

计算密集型任务

在拥有 N 个 CPU 的处理器上,线程池大小设置为 N + 1,通常实现最优利用率。

多出来的线程,确保线程偶尔暂停的时候,也不会浪费 CPU 时钟周期。

I/O密集型任务

这种任务不会一直运行,因此线程池规模应该更大。需要估算任务的等待时间计算时间的比值,或许通过一些分析/监控工具获得。

线程池最优大小:

线程池数量 = CPU 数量 * CPU 利用率 * (1 + 等待时间/计算时间)

拒绝策略

  • AbortPolicy:拒绝执行任务,并抛出 RejectedExecutionException 异常
  • DiscardPolicy:丢弃任务
  • DiscardOldestPolicy:移除掉排在队列中时间最久未处理的任务,然后尝试重新执行新任务
  • CallerRunsPolicy:直接由调用代码处所在的线程运行任务,而非在线程池中运行,即直接运行 r.run()

线程池关闭

shutdown()

不再接受新的任务,但是已经添加进队列的任务可以继续运行,也就是说它会等待正在执行的任务和等待队列中没有执行的任务全部执行完毕。当然这个过程是异步的,你调用这个方法,不会阻塞在这个方法上。

public void shutdown() {
    interruptIdleWorkers();
}

shutdownNow()

不再接受新的任务,同时对所有的 Worker 都会尝试 interrupt,同时已经添加进队列的任务也不再等待执行,同时返回未执行的任务到一个新的队列中。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;

    interruptWorkers();
    tasks = drainQueue();

    return tasks;
}

优雅关闭

生产环境中,多数都会调用 shutdown() 方法,即关闭之后,让已经 submit 的任务有机会继续执行一段时间。同时为了避免因任务太多长时间运行不完的情况,又会通过 awaitTermination 附加一个超时时间,这个时间过后,线程池强制关闭。

示例代码:

this.scheduledExecutorService.shutdown();
try {
    this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}

Executors 创建线程弊端

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。说明:Executors 返回的线程池对象的弊端如下:

1) FixedThreadPoolSingleThreadPool

允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

2) CachedThreadPool

允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。