线程池
简介
好处
说明:线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
设计哲学
将任务的提交与执行解耦开,从而无须太大的困难就能为某种类型的任务指定和修改执行策略。
用法
任务无须返回值,调用这个方法:
public void execute(Runnable command) {}
需要返回值的任务,调用 submit
:
Future<Object> future = executor.submit(hasReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// ...
} catch (ExecutionException e) {
// ...
}
构造器参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// ...
}
execute()
方法运行原理
(1) 如果当前运行的线程少于 corePoolSize
,则创建新线程来执行任务。部分代码片段如下:
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
addWorker
获取锁,创建线程,并运行任务,伪代码如下:
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
workers.add(w);
} finally {
mainLock.unlock();
}
t.start();
}
其中 new Worker(firstTask)
的内部,创建了新的线程:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
(2)如果运行的线程等于或多于 corePoolSize
,则将任务加入 BlockingQueue
。
workQueue.offer(command)
(3)如果无法将任务加入 BlockingQueue
(队列已满),则再一次执行 addWorker
尝试创建新的线程:
if (isRunning(c) && workQueue.offer(command)) {
// 没有命中这个 if 条件
} else if (!addWorker(command, false)) {
// ...
}
注意,这一次传递给 addWorker
的第二个参数是 core = false
,即要求创建的是非核心线程,而 addWorker
内部也会根据 core
的值来循环检查是否大于 corePoolSize
还是 maximumPoolSize
:
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
(4)如果创建新线程将使当前运行的线程超出 maximumPoolSize
,并且线程池内的阻塞队列已满,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()
方法。
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
图:
维护状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl
这个 AtomicInteger
类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它同时包含两部分的信息:线程池的运行状态 (runState
) 和线程池内有效线程的数量 (workerCount
),高 3 位保存 runState
,低 29 位保存 workerCount
,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。
关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
ThreadPoolExecutor
的运行状态有 5 种,分别为:
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
抛出异常
Java 线程池执行的任务抛出异常但是看不到日志是因为什么?
runWorker
会捕获包括 Error
在内的所有异常,并且在程序最后,将出现过的异常和当前任务传递给 afterExecute
方法。
//省略无关部分
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); //执行程序逻辑
} catch (RuntimeException x) { //捕获RuntimeException
thrown = x; throw x;
} catch (Error x) { //捕获Error
thrown = x; throw x;
} catch (Throwable x) { //捕获Throwable
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //运行完成,进行后续处理
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
//省略无关部分
而 ThreadPoolExecutor
中的 afterExecute
方法是没有任何实现的:
protected void afterExecute(Runnable r, Throwable t) { }
也就是说,默认情况下,线程池会捕获任务抛出的所有异常,但是不做任何处理。
如何解决?
- 全部都
try...catch...
- 自定义线程池,重写
afterExecute
方法 - 使用
submit
进行提交,然后会返回一个Future
对象
Future<?> future = pool.submit(new Task());
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Reset interrupted status
} catch (ExecutionException e) {
Throwable exception = e.getCause();
// Forward to exception reporter
}
- 使用线程工厂的
Thread.UncaughtExceptionHandler
@Override
public Thread newThread(Runnable run) {
Thread thread = defaultFactory.newThread(run);
thread.setUncaughtExceptionHandler(handler);
return thread;
}
线程池大小
计算密集型任务
在拥有 N
个 CPU 的处理器上,线程池大小设置为 N + 1
,通常实现最优利用率。
多出来的线程,确保线程偶尔暂停的时候,也不会浪费 CPU 时钟周期。(比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。)
I/O密集型任务
这种任务不会一直运行,因此线程池规模应该更大。我们可以多配置一些线程,具体的计算方法是 2N
。需要估算任务的等待时间与计算时间的比值,或许通过一些分析/监控工具获得。
线程池最优大小:
线程池数量 = CPU 数量 * CPU 利用率 * (1 + 等待时间/计算时间)
我们可以通过 JDK 自带的工具
VisualVM
来查看 WT(线程等待时间)/ST(线程运行时间) 比例
综合来看,我们可以根据自己的业务场景,从“N+1”和“2N”两个公式中选出一个适合的,计算出一个大概的线程数量,之后通过实际压测,逐渐往“增大线程数量”和“减小线程数量”这两个方向调整,然后观察整体的处理时间变化,最终确定一个具体的线程数量。
拒绝策略
AbortPolicy
:拒绝执行任务,并抛出RejectedExecutionException
异常DiscardPolicy
:丢弃任务DiscardOldestPolicy
:移除掉排在队列中时间最久未处理的任务,然后尝试重新执行新任务CallerRunsPolicy
:直接由调用代码处所在的线程运行任务,而非在线程池中运行,即直接运行r.run()
线程池关闭
shutdown()
不再接受新的任务,但是已经添加进队列的任务可以继续运行,也就是说它会等待正在执行的任务和等待队列中没有执行的任务全部执行完毕。当然这个过程是异步的,你调用这个方法,不会阻塞在这个方法上。
public void shutdown() {
interruptIdleWorkers();
}
shutdownNow()
不再接受新的任务,同时对所有的 Worker
都会尝试 interrupt
,同时已经添加进队列的任务也不再等待执行,同时返回未执行的任务到一个新的队列中。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
interruptWorkers();
tasks = drainQueue();
return tasks;
}
优雅关闭
生产环境中,多数都会调用 shutdown()
方法,即关闭之后,让已经 submit
的任务有机会继续执行一段时间。同时为了避免因任务太多长时间运行不完的情况,又会通过 awaitTermination
附加一个超时时间,这个时间过后,线程池强制关闭。
示例代码:
this.scheduledExecutorService.shutdown();
try {
this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
Executors
Executors
是创建线程池的工具类。
Executors 创建线程弊端
线程池不允许使用 Executors
去创建,而是通过 ThreadPoolExecutor
的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。说明:Executors
返回的线程池对象的弊端如下:
1) FixedThreadPool
和 SingleThreadPool
:
允许的请求队列长度为 Integer.MAX_VALUE
,可能会堆积大量的请求,从而导致 OOM。
2) CachedThreadPool
:
允许的创建线程数量为 Integer.MAX_VALUE
,可能会创建大量的线程,从而导致 OOM。
LinkedBlockingQueue
单线程池和固定线程池使用的都是这个 Queue:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
两个基本的构造器如下所示:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
this.capacity = capacity;
last = head = new Node<E>(null);
}
内部有两把锁:
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
SynchronousQueue
SynchronousQueue
的简介如下:
A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate.
SynchronousQueue
通过两个内部类实现了公平策略和非公平策略的无缓存阻塞队列,每种操作都需要对应的互补操作同时进行才能完成,例如,入队操作必然对应出队操作,在不涉及超时和中断的情况下,必须等待另一个线程进行出队操作,两两匹配才能执行,否则就阻塞等待。
CachedThreadPool
用的就是这个队列:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程设置为0
,最大线程池设置Integer.MAX_VALUE
,存活时间60s
,阻塞队列使用SynchronousQueue
,默认非公平模式,可缓冲线程池通过复用空闲线程提高效率。这里最大线程数设置为Integer.MAX_VALUE
,可能会创建非常多的线程,甚至导致OOM
。
美团线程池
线上问题
- 服务展示接口内部逻辑使用线程池做并行计算,由于没有预估好调用的流量,导致最大核心数设置偏小,大量抛出
RejectedExecutionException
,触发接口降级条件 - 服务处理请求内部逻辑使用线程池做资源隔离,由于队列设置过长,最大线程数设置失效,导致请求数量增加时,大量任务堆积在队列中,任务执行时间过长,最终导致下游服务的大量调用超时失败。
动态配置
将线程池的参数从代码中迁移到分布式配置中心上,实现线程池参数可动态配置和即时生效。
- 增加线程监控:运行时状态实时查看
- 负载告警