处理 ForkJoinPool 任务/操作中未捕获异常的更好方法
使用 ForkJoinPool
提交任务(RecursiveAction
或 RecursiveTask
)?
ForkJoinPool 接受 Thread.UncaughtExceptionHandler
来处理 WorkerThread 突然终止时的异常(无论如何,这不在我们的控制之下),但是当
ForkJoinTask
抛出一个例外。我正在使用标准 提交
/在我的实现中调用All
方式。
这是我的场景:
我有一个线程在无限循环中运行,从第三方系统读取数据。在此线程中,我将任务提交到 ForkJoinPool
新线程() { 公共无效运行(){ 而(真){ ForkJoinTask
;无用返回= ForkJoinPool.submit(RecursiveActionTask); } } }
我正在使用 RecursiveAction,并且在少数情况下使用 RecursiveTask。这些任务使用submit()
方法提交到FJPool。 我想要一个类似于 UncaughtExceptionHandler 的通用异常处理程序,如果任务抛出未检查/未捕获的异常,我可以处理该异常并在需要时重新提交任务。处理异常还可以确保如果一个/某些任务抛出异常,排队的任务不会被取消。
invokeAll()
方法返回一组 ForkJoinTask,但这些任务位于递归块中(每个任务调用 compute()
方法,并且可以进一步拆分[假设场景])
class RecursiveActionTask extends RecursiveAction {
public void compute() {
if <task.size() <= ACCEPTABLE_SIZE) {
processTask() // this might throw an checked/unchecked exception
} else {
RecursiveActionTask[] splitTasks = splitTasks(tasks)
RecursiveActionTasks returnedTasks = invokeAll(splitTasks);
// the below code never executes as invokeAll submits the tasks to the pool
// and the flow never comes to the code below.
// I am looking for some handling like this
for (RecusiveActionTask task : returnedTasks) {
if (task.isDone()) {
task.getException() // handle this exception
}
}
}
}
}
我注意到,当 3-4 个任务失败时,整个队列提交单元都会被丢弃。目前,我在 processTask 周围放置了一个我个人不喜欢的 try/catch
。我正在寻找更通用的。
- 我还想知道所有失败的任务列表,以便我可以重新提交它们
- 当任务抛出异常时,线程是否会从池中逐出(尽管我的分析发现它们不会[但不确定])?
- 在 FutureTask 上调用 get() 方法更有可能使我的流程按顺序排列,因为它会等待任务完成。
- 我只想知道任务失败时的状态。我不在乎它何时完成(显然不想等一个小时后)
有什么想法如何处理上述场景中的异常吗?
What is the better way to handle exceptions(uncaught) while using ForkJoinPool
to submit tasks (RecursiveAction
or RecursiveTask
)?
ForkJoinPool accepts a Thread.UncaughtExceptionHandler
to handle exceptions when the WorkerThread terminates abruptly(which is anyways not under our control) but this handler is not used when ForkJoinTask
throws an exception. I am using the standard submit
/invokeAll
way in my implementation.
Here is my scenario:
I have a Thread running in a infinite loop reading data from a 3rd party system. With in this Thread I submit Tasks to the ForkJoinPool
new Thread() { public void run() { while (true) { ForkJoinTask<Void> uselessReturn = ForkJoinPool.submit(RecursiveActionTask); } } }
I am using a RecursiveAction and in few scenarios a RecursiveTask. These tasks are submitted to FJPool using submit()
method.
I want to have a generic exception handler similar to UncaughtExceptionHandler
where if a Task throws an unchecked/uncaught exception I can process the exception and re-submit the task if required. Handling the exception also ensures the queued tasks would not get cancelled if one/some of the Tasks throw an exception.
invokeAll()
method returns a set of ForkJoinTasks but these Tasks are in a recursive block (each task invokes the compute()
method and may be split further [hypothetical scenario] )
class RecursiveActionTask extends RecursiveAction {
public void compute() {
if <task.size() <= ACCEPTABLE_SIZE) {
processTask() // this might throw an checked/unchecked exception
} else {
RecursiveActionTask[] splitTasks = splitTasks(tasks)
RecursiveActionTasks returnedTasks = invokeAll(splitTasks);
// the below code never executes as invokeAll submits the tasks to the pool
// and the flow never comes to the code below.
// I am looking for some handling like this
for (RecusiveActionTask task : returnedTasks) {
if (task.isDone()) {
task.getException() // handle this exception
}
}
}
}
}
I noticed that when 3-4 tasks fail the whole queue submission unit is discarded. Currently I have put a try/catch
around processTask that I personally don't like. I am looking for more generic.
- I also want to know about all the list of tasks that failed so that I can re-submit them
- When the tasks throw exceptions do the threads get evicted from the pool (although my analysis found they doesn't [but not sure] )?
- Calling
get()
method on the FutureTask would more likely put my flow sequential as it waits until the task completes. - I want to know the status of the Task only if it fails. I don't care when it completes (obviously doesn't want to wait an hour later)
Any ideas how to handle the exceptions in the above scenario?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
这就是我们在 Akka 中解决这个问题的方法:
This is how we solved it in Akka:
@Rajendra,你做的一切都是正确的,除了你应该使用 ForkJoinPoolexecute() 而不是submit()。
这样,如果 Runnable 失败,它将强制产生一个工作异常,并且它将被 UncaughtExceptionHandler 捕获。
我不知道为什么这种行为存在,但它会起作用!我经历了惨痛的教训 :(
从 Java 8 代码中获取:
提交正在使用 AdaptedRunnableAction()。
执行正在使用 RunnableExecuteAction()(请参阅rethrow(ex))。
@Rajendra, You are doing everything right, except you should use ForkJoinPool execute() instead of submit().
This way if the Runnable fails it will force a worker exception and it will be caught by your UncaughtExceptionHandler.
I don't know why this behavior exist but it will work! I've learned it the hard way :(
Taking from Java 8 code:
submit is using AdaptedRunnableAction().
execute is using RunnableExecuteAction() (see the rethrow(ex)).