Java Stream 并不支持指定线程池,实际编码中,有些开发者可能会使用一些“技巧”来指定线程池。实际上,所谓的技巧不仅降低了可读性,而且很容易出现bug。本文将分析并行流式编程的设计思想、”技巧“会带来的问题,并提出相关的解决方案。
简单总结就是官方考虑过相关方案,认为没必要,parallel 提供了简单直接的使用方式,官方的初衷就是流式编程需要轻松实现并行支持,而指定线程池会使API更复杂化,不便使用,还会有线程安全性问题。需要指出的是,官方考虑过相关方案。
并行流默认使用公共线程池,基本思想为分治。公共池类型为 ForkJoinPool, 公共线程池并发度为CUP核数 - 1,适用于处理CPU密集型任务。任务为递归型任务,任务可以划分为子任务,空闲线程可以”窃取“待执行任务,充分利用线程池。一般情况下,使用公用池时,任务队列中会存在比较多的小任务。使用公用池的好处是可以避免创建过多无用的线程,特别是对于CPU密集型任务,新增线程反而会增加上下文开销。
流式编程可能是函数式编程最被大众接受的一种编程方式。理论上,流的处理过程中,所有的方法都应该是纯函数,遵循引用透明原则,内部可以对具体执行流程进行优化,其不为 IO 密集型任务是理所应当的。
不妨看看官方对于 ForkJoinTask 的描述:
A ForkJoinTask is a lightweight form of Future. The efficiency of ForkJoinTasks stems from a set of restrictions (that are only partially statically enforceable) reflecting their main use as computational tasks calculating pure functions or operating on purely isolated objects. The primary coordination mechanisms are fork, that arranges asynchronous execution, and join, that doesn't proceed until the task's result has been computed. Computations should ideally avoid synchronized methods or blocks, and should minimize other blocking synchronization apart from joining other tasks or using synchronizers such as Phasers that are advertised to cooperate with fork/ join scheduling. Subdividable tasks should also not perform blocking I/ O, and should ideally access variables that are completely independent of those accessed by other running tasks. These guidelines are loosely enforced by not permitting checked exceptions such as IOExceptions to be thrown.
简单总结就是 Future类型,通过fork、join 编排异步任务执行,应该避免阻塞方法。
若想指定线程池或者实现细粒度的代码执行控制,官方推荐使用 CompletableFuture,JUC 等相关类库。
以下stackoverflow上有关于指定线程池的解答:java
代码解读复制代码final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
通过这个”技巧“可以指定 ForkJoinPool。其底层逻辑可以总结为:当 ForkJoin 任务执行时,其可以获得线程池上下文,任务(子任务)会在线程池中执行。
但是,这个 trick 是不可靠的。你需要注意 JDK 的版本,openjdk8u222 之前的版本实现有bug,使用的依然是公共池,官方bug修改相关信息可以看这里,Parallel stream execution within a custom ForkJoinPool should obey the parallelism。
以下是使用CompletableFuture 的 trick 实现,基本思路是一样的:java
代码解读复制代码ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
//parallel task here, for example
range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()),
forkJoinPool
);
需要注意的是:由于forkJoinTask的存在,虽然看似提交了一条任务,实际上提交了很多条。
首先,无论如何不要在并行流里执行阻塞任务,除非你对于其内部实现非常了解,否则,你会遇到各种各样所谓的坑。
当然,官方也没有完全禁止,其表述如下:
It is possible to define and use ForkJoinTasks that may block, but doing so requires three further considerations: (1) Completion of few if any other tasks should be dependent on a task that blocks on external synchronization or I/ O. Event-style async tasks that are never joined (for example, those subclassing CountedCompleter) often fall into this category. (2) To minimize resource impact, tasks should be small; ideally performing only the (possibly) blocking action. (3) Unless the ForkJoinPool. ManagedBlocker API is used, or the number of possibly blocked tasks is known to be less than the pool's ForkJoinPool. getParallelism level, the pool cannot guarantee that enough threads will be available to ensure progress or good performance.
如果你能够对以上描述有深入理解,比如 ManagedBlocker 的工作原理,那么可能没人可以拦住你使用阻塞任务。但是,由于公共池是公用的,每一次任务的成功执行不一定能保证整体执行多条任务时能够执行成功(这也是推荐使用自定义线程池的原因之一)。
其次,应该理解并行流的基本执行流程。不是所有stream 都可以 parallel 的。笔者遇到的生产代码没有遇到非要使用parallel 才能解决问题的,而且99%遇到的parallel 使用都是有问题。
Effective Java 第48条(谨慎使用并行流)指出了一些规则:
再次,你需要能写出无需状态、无副作用的纯函数。
最后,需要进行性能测试。实际上,使用并行流不一定提高性能。我们可以在最初阶段估算并行度,比如并行排序,一方面只有可以并行的运算才可以提高性能;另一方面,任务划分可能会划分过多的子任务,结果收集难以并行运算,还有线程上下文切换、数据同步等开销。性能测试不是一次性的,应该尽可能模拟实际生产场景。
总之,轻松使用并行流可能应了这句话:“理想很丰满,现实很骨感”。我们已经有了容易理解的API(CompletableFuture、线程池、JUC等工具类),没必要舍近求远使用复杂、易错、性能不一定好的实现。
Stream类中可拓展性最好的方法是 collect, 你可以传入不同的Collector 实现,比如 使用 toConcurrentMap 返回并发支持Map、Guava 中使用 toImmutableList
, toImmutableSet
返回不可变集合、使用 Comparators. greatest(k, comparator)
高效计算 topK问题等等。
对于阻塞任务,开源类库 Parallel Collector 提供了收集阻塞任务的能力,示例代码如下:java
代码解读复制代码list.stream()
.collect(parallel(i -> blockingOp(i), toList()))
// 加入超时机制,提高系统韧性
.orTimeout(1000, MILLISECONDS)
.thenAcceptAsync(System.out::println, executor)
.thenRun(() -> System.out.println("Finished!"));
总结其特点如下:
其返回类型为 CompletableFuture,无需等待即可返回。实现了 CompletableFuture 和 Stream 流的转换。
其可以指定执行器和并发度,方便并发控制。
和标准库无缝衔接,没有所谓的 trick,做法仅仅是实现 Collector 接口,没有对Stream底层实现有依赖。
成熟,目前支持 JDK21+(虚拟线程) 和 JDK8+(线程池)。截至本文发表前 JDK8+支持版本为
com.pivovarit:parallel-collectors:2.6.1