ForkJoinPool 不正确的 invokeAll 方法行为 (java)
ExecutorService 有 invokeAll 方法,文档如下:
执行给定的任务,返回持有其任务的 Futures 列表 全部完成或超时到期时的状态和结果,以两者为准 首先发生。 Future.isDone 对于返回的每个元素都是 true 列表。 返回后,未完成的任务将被取消。笔记 已完成的任务可以正常终止或通过 抛出异常。如果该方法的结果是未定义的 在此操作正在进行时给定的集合被修改
考虑以下代码:
public class Main {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Callable<Integer>> tasks = Arrays.asList(
Main::veryLongCalculations,
Main::veryLongCalculations,
Main::veryLongCalculations);
try {
List<Future<Integer>> resultTasks = executorService.invokeAll(tasks, 2, TimeUnit.SECONDS);
for (Future<Integer> task: resultTasks) {
System.out.println("is done: " + task.isDone() + ", canceled: " + task.isCancelled());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static Integer veryLongCalculations() {
while (!Thread.currentThread().isInterrupted()) {
//very long calculations
}
System.out.println("interrupted");
return 0;
}
}
如果我们运行此代码,屏幕上将显示以下输出:
interrupted
interrupted
interrupted
is done: true, canceled: true
is done: true, canceled: true
is done: true, canceled: true
任务显然运行时间超过了超时时间,因此被取消。任务状态已完成。一切都按预期进行。
但是如果我们使用 ForkJoinPool 作为 ExecutorService (ExecutorService executorService = Executors.newWorkStealingPool()) 那么输出将是这样的:
interrupted
interrupted
is done: true, canceled: true
is done: true, canceled: true
is done: false, canceled: false
其中一个任务永远不会被取消,这与文档相矛盾 但是如果我们将sdk从java17更改为java11,那么它就可以正常工作。 我开始理解,看到ForkJoinPool继承自AbstractExecutorService类,它实现了invokeAll方法。但在 java 17 中,该方法在 ForkJoinPool 本身中被重载。这是重载方法:
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
try {
for (Callable<T> t : tasks) {
ForkJoinTask<T> f =
new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
futures.add(f);
externalSubmit(f);
}
long startTime = System.nanoTime(), ns = nanos;
boolean timedOut = (ns < 0L);
for (int i = futures.size() - 1; i >= 0; --i) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (timedOut)
ForkJoinTask.cancelIgnoringExceptions(f);
else {
((ForkJoinTask<T>)f).awaitPoolInvoke(this, ns);
if ((ns = nanos - (System.nanoTime() - startTime)) < 0L)
timedOut = true;
}
}
}
return futures;
} catch (Throwable t) {
for (Future<T> e : futures)
ForkJoinTask.cancelIgnoringExceptions(e);
throw t;
}
}
在第一次迭代中,timedOut 变量将始终为 false,因此一个任务永远不会被取消。这是一个错误吗?还是有其他原因导致这种情况?这种行为显然与文档相反,文档规定所有任务都应具有已完成状态,并且如果超时到期则应取消。
ExecutorService have invokeAll method, and documentation say next:
Executes the given tasks, returning a list of Futures holding their
status and results when all complete or the timeout expires, whichever
happens first. Future.isDone is true for each element of the returned
list. Upon return, tasks that have not completed are cancelled. Note
that a completed task could have terminated either normally or by
throwing an exception. The results of this method are undefined if the
given collection is modified while this operation is in progress
Consider the following code:
public class Main {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Callable<Integer>> tasks = Arrays.asList(
Main::veryLongCalculations,
Main::veryLongCalculations,
Main::veryLongCalculations);
try {
List<Future<Integer>> resultTasks = executorService.invokeAll(tasks, 2, TimeUnit.SECONDS);
for (Future<Integer> task: resultTasks) {
System.out.println("is done: " + task.isDone() + ", canceled: " + task.isCancelled());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static Integer veryLongCalculations() {
while (!Thread.currentThread().isInterrupted()) {
//very long calculations
}
System.out.println("interrupted");
return 0;
}
}
If we run this code, the following output will be displayed on the screen:
interrupted
interrupted
interrupted
is done: true, canceled: true
is done: true, canceled: true
is done: true, canceled: true
The tasks were clearly running longer than the timeout and so they were cancelled. Status of tasks is completed. Everything worked exactly as expected.
But if we use ForkJoinPool as ExecutorService (ExecutorService executorService = Executors.newWorkStealingPool()) then the output will be as like this:
interrupted
interrupted
is done: true, canceled: true
is done: true, canceled: true
is done: false, canceled: false
One of the tasks is never cancelled, which contradicts the documentation
But if we change sdk from java17 to java11, then it will work fine.
I began to understand and saw that ForkJoinPool inherits from the AbstractExecutorService class, which implements the invokeAll method. But in java 17 this method is overloaded in the ForkJoinPool itself. Here is the overloaded method:
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
try {
for (Callable<T> t : tasks) {
ForkJoinTask<T> f =
new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
futures.add(f);
externalSubmit(f);
}
long startTime = System.nanoTime(), ns = nanos;
boolean timedOut = (ns < 0L);
for (int i = futures.size() - 1; i >= 0; --i) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (timedOut)
ForkJoinTask.cancelIgnoringExceptions(f);
else {
((ForkJoinTask<T>)f).awaitPoolInvoke(this, ns);
if ((ns = nanos - (System.nanoTime() - startTime)) < 0L)
timedOut = true;
}
}
}
return futures;
} catch (Throwable t) {
for (Future<T> e : futures)
ForkJoinTask.cancelIgnoringExceptions(e);
throw t;
}
}
On the first iteration, the timedOut variable will always be false, so one task will never be cancelled. Is this a bug? Or is there some other reason why it works this way? This behavior is clearly contrary to the documentation, which says that all tasks should have a completed status and should be canceled if the timeout expires.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论