池的概念大家并不陌生,数据库连接池、线程池等...大体来说,有三个优点:
- 降低资源消耗。
- 提高响应速度。
- 便于统一管理。
以上是 “池化” 技术的相同特点,至于他们之间的不同点这里不讲,两者都是为了提高性能和效率,抛开实际做连连看找不同,没有意义。
同样,类比于线程池来说:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
名称 | 描述 |
---|---|
ArrayBlockingQueue | 基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。 |
LinkedBlockingQueue | 基于链表结构的阻塞队列,此队列按 FIFO (先进先出) 排序元素,吞吐量通常要高于 ArrayBlockingQueue。Executors.newFixedThreadPool( ) 使用了这个队列。 |
SynchronousQueue | 不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue,静态工厂方法 Executors.newCachedThreadPool( ) 使用了这个队列。 |
PriorityBlockingQueue | 具有优先级的无限阻塞队列。 |
策略名称 | 特性 |
---|---|
AbortPolicy | 默认的饱和策略,直接抛出 RejectedExecutionException 异常 |
DiscardPolicy | 不处理,直接丢弃任务 |
CallerRunsPolicy | 使用调用者的线程执行任务 |
DiscardOldestPolicy | 丢弃队列里最近的一个任务,执行当前任务 |
同时,还可以自行实现 RejectedExecutionHandler
接口来自定义饱和策略,比如记录日志、持久化等等。
void execute(Runnable command)
ThreadFactory namedThreadFactory =
new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
ExecutorService executor =
new ThreadPoolExecutor(
10,
1000,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
executor.execute(
() -> {
System.out.println(1111);
});
注意使用 execute 方法提交任务时,没有返回值。
Future<?> submit(Runnable task)
❀ 示例:
Future<Integer> future = executor.submit(() -> {
return 1 + 1;
});
Integer result = future.get();
还可以使用 submit 方法提交任务,该方法返回一个 Future
对象,通过 Future#get( )
方法可以获得任务的返回值,该方法会一直阻塞知道任务执行完毕。还可以使用 Future#get(long timeout, TimeUnit unit)
方法,该方法会阻塞一段时间后立即返回,而这时任务可能没有执行完毕。
ThreadPoolExecutor 提供了 shutdown( ) 和 shutdownNow( ) 两个方法关闭线程池。原理是首先遍历线程池的工作线程,依次调用 interrupt( )
方法中断线程,这样看来如果无法响应中断的任务就不能终止。
两者区别是:
shutdownNow( )
首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。shutdown( )
首先将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。如果调用了其中一种方法,isShutdown 方法就会返回 true。当所有的任务都已关闭后, 才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。实际应用中可以根据任务是否一定要执行完毕的特性,决定使用哪种方法关闭线程池。
通常我们可以根据 CPU 核心数量来设计线程池数量。
可以通过 Runtime.getRuntime().availableProcessors()
方法获得当前设备的物理核心数量。值得注意的是,如果应用运行在一些 docker 或虚拟机容器上时,该方法取得的是当前物理机的 CPU 核心数。
IO 密集型 2nCPU
计算密集型 nCPU+1
- 其中 n 为 CPU 核心数量。
- 为什么加 1:即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。
当提交一个新任务时,线程池的处理步骤:
- 判断当前线程池内的线程数量是否小于核心线程数,如果小于则新建线程执行任务。否则,进入下个阶段。
- 判断队列是否已满,如果没满,则将任务加入等待队列。否则,进入下个阶段。
- 在上面基础上判断是否大于最大线程数,如果是根据响应的策略处理。否则,新建线程执行当前任务。
线程池的源码比较简单易懂,感兴趣的小伙伴可以自行查看 java.util.concurrent.ThreadPoolExecutor
,在线程池中每个任务都被包装为一个一个的 Worker ,下面简单看下 Worker 的 run( ) 方法:
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
可以看到不断的循环取出 Task 并执行,而在任务的执行前后,有 beforeExecute 和 afterExecute 方法,我们可以实现两个方法实现一些监控逻辑。除此之外还可以集合线程池的一些属性或者重写 terminated() 方法在线程池关闭时进行监控。
在 Executors
中提供了集中常见的线程池,分别应用在不同的场景。
上面几种线程池的特性主要依赖于 ThreadPoolExecutor 的几个参数来实现,不同的核心线程数量,以及不同类型的阻塞队列,同时我们还可以自行实现自己的线程池满足业务需求。
值得注意的是,并不推荐使用 Executors
创建线程池,详见下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
❀ 继续来看 LinkedBlockingQueue :
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
可以看到使用 LinkedBlockingQueue 创建的是 Integer.MAX_VALUE 大小的队列,会堆积大量的请求,从而造成 OOM
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
同样,使用的 LinkedBlockingQueue ,一样的情况
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
代码课件线程池使用的最大线程数是 Integer.MAX_VALUE ,可能会创建大量线程,导致 OOM
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
和上面是一样的问题,最大线程数是 Integer.MAX_VALUE
所以原则上来说禁止使用 Executors 创建线程池, 而使用 ThreadPoolExecutor 的构造函数来创建线程池。
线程池在开发中还是比较常见的,结合不同的业务场景,结合最佳实践配置正确的参数,可以帮助我们的应用性能得到提升。