并发 - 线程池

线程池

简介

好处

说明:线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

设计哲学

将任务的提交与执行解耦开,从而无须太大的困难就能为某种类型的任务指定和修改执行策略。

用法

任务无须返回值,调用这个方法:

public void execute(Runnable command) {}

需要返回值的任务,调用 submit

Future<Object> future = executor.submit(hasReturnValuetask);

try {
    Object s = future.get();
} catch (InterruptedException e) {
    // ...
} catch (ExecutionException e) {
    // ...
}

构造器参数

public ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler) {
    // ...
}

execute() 方法运行原理

(1) 如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务。部分代码片段如下:

if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}

addWorker 获取锁,创建线程,并运行任务,伪代码如下:

w = new Worker(firstTask);
final Thread t = w.thread;

if (t != null) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        workers.add(w);
    } finally {
        mainLock.unlock();
    }

    t.start();
}

其中 new Worker(firstTask) 的内部,创建了新的线程:

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

(2)如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue

workQueue.offer(command)

(3)如果无法将任务加入 BlockingQueue(队列已满),则再一次执行 addWorker 尝试创建新的线程:

if (isRunning(c) && workQueue.offer(command)) {
    // 没有命中这个 if 条件
} else if (!addWorker(command, false)) {
    // ...
}

注意,这一次传递给 addWorker 的第二个参数是 core = false,即要求创建的是非核心线程,而 addWorker 内部也会根据 core 的值来循环检查是否大于 corePoolSize 还是 maximumPoolSize

int wc = workerCountOf(c);
if (wc >= CAPACITY ||
    wc >= (core ? corePoolSize : maximumPoolSize))
    return false;

(4)如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution() 方法。

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

抛出异常

Java 线程池执行的任务抛出异常但是看不到日志是因为什么?

runWorker 会捕获包括 Error 在内的所有异常,并且在程序最后,将出现过的异常和当前任务传递给 afterExecute 方法。

//省略无关部分
try {
    beforeExecute(wt, task);  
    Throwable thrown = null;
    try {
        task.run();  //执行程序逻辑
    } catch (RuntimeException x) { //捕获RuntimeException
        thrown = x; throw x;
    } catch (Error x) { //捕获Error
        thrown = x; throw x;
    } catch (Throwable x) {   //捕获Throwable
        thrown = x; throw new Error(x);
    } finally {
        afterExecute(task, thrown);  //运行完成,进行后续处理
    }
} finally {
    task = null;
    w.completedTasks++;
    w.unlock();
}
   
//省略无关部分

ThreadPoolExecutor 中的 afterExecute 方法是没有任何实现的:

protected void afterExecute(Runnable r, Throwable t) { }

也就是说,默认情况下,线程池会捕获任务抛出的所有异常,但是不做任何处理

如何解决?

  • 全部都 try...catch...
  • 自定义线程池,重写 afterExecute 方法
  • 使用 submit 进行提交,然后会返回一个 Future 对象
Future<?> future = pool.submit(new Task());
try {
    future.get();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt(); // Reset interrupted status
} catch (ExecutionException e) {
    Throwable exception = e.getCause();
    // Forward to exception reporter
}
  • 使用线程工厂的 Thread.UncaughtExceptionHandler
@Override 
public Thread newThread(Runnable run) {
    Thread thread = defaultFactory.newThread(run);
    thread.setUncaughtExceptionHandler(handler);
    return thread;
}

线程池大小

计算密集型任务

在拥有 N 个 CPU 的处理器上,线程池大小设置为 N + 1,通常实现最优利用率。

多出来的线程,确保线程偶尔暂停的时候,也不会浪费 CPU 时钟周期。(比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。)

I/O密集型任务

这种任务不会一直运行,因此线程池规模应该更大。我们可以多配置一些线程,具体的计算方法是 2N。需要估算任务的等待时间计算时间的比值,或许通过一些分析/监控工具获得。

线程池最优大小:

线程池数量 = CPU 数量 * CPU 利用率 * (1 + 等待时间/计算时间)

我们可以通过 JDK 自带的工具 VisualVM 来查看 WT(线程等待时间)/ST(线程运行时间) 比例

综合来看,我们可以根据自己的业务场景,从“N+1”和“2N”两个公式中选出一个适合的,计算出一个大概的线程数量,之后通过实际压测,逐渐往“增大线程数量”和“减小线程数量”这两个方向调整,然后观察整体的处理时间变化,最终确定一个具体的线程数量。

拒绝策略

  • AbortPolicy:拒绝执行任务,并抛出 RejectedExecutionException 异常
  • DiscardPolicy:丢弃任务
  • DiscardOldestPolicy:移除掉排在队列中时间最久未处理的任务,然后尝试重新执行新任务
  • CallerRunsPolicy:直接由调用代码处所在的线程运行任务,而非在线程池中运行,即直接运行 r.run()

线程池关闭

shutdown()

不再接受新的任务,但是已经添加进队列的任务可以继续运行,也就是说它会等待正在执行的任务和等待队列中没有执行的任务全部执行完毕。当然这个过程是异步的,你调用这个方法,不会阻塞在这个方法上。

public void shutdown() {
    interruptIdleWorkers();
}

shutdownNow()

不再接受新的任务,同时对所有的 Worker 都会尝试 interrupt,同时已经添加进队列的任务也不再等待执行,同时返回未执行的任务到一个新的队列中。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;

    interruptWorkers();
    tasks = drainQueue();

    return tasks;
}

优雅关闭

生产环境中,多数都会调用 shutdown() 方法,即关闭之后,让已经 submit 的任务有机会继续执行一段时间。同时为了避免因任务太多长时间运行不完的情况,又会通过 awaitTermination 附加一个超时时间,这个时间过后,线程池强制关闭。

示例代码:

this.scheduledExecutorService.shutdown();
try {
    this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}

Executors

Executors 是创建线程池的工具类。

Executors 创建线程弊端

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。说明:Executors 返回的线程池对象的弊端如下:

1) FixedThreadPoolSingleThreadPool

允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

2) CachedThreadPool

允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

LinkedBlockingQueue

单线程池和固定线程池使用的都是这个 Queue:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

两个基本的构造器如下所示:

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

内部有两把锁

private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();

private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

SynchronousQueue

SynchronousQueue 的简介如下:

A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate.

SynchronousQueue 通过两个内部类实现了公平策略和非公平策略的无缓存阻塞队列,每种操作都需要对应的互补操作同时进行才能完成,例如,入队操作必然对应出队操作,在不涉及超时和中断的情况下,必须等待另一个线程进行出队操作,两两匹配才能执行,否则就阻塞等待

CachedThreadPool 用的就是这个队列:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
}

核心线程设置为0,最大线程池设置Integer.MAX_VALUE,存活时间60s,阻塞队列使用SynchronousQueue,默认非公平模式,可缓冲线程池通过复用空闲线程提高效率。这里最大线程数设置为Integer.MAX_VALUE,可能会创建非常多的线程,甚至导致OOM