处理 ForkJoinPool 任务/操作中未捕获异常的更好方法
使用 ForkJoinPool
提交任务(RecursiveAction
或 RecursiveTask
)?
ForkJoinPool 接受 Thread.UncaughtExceptionHandler
来处理 WorkerThread 突然终止时的异常(无论如何,这不在我们的控制之下),但是当
ForkJoinTask
抛出一个例外。我正在使用标准 提交
/在我的实现中调用All
方式。
这是我的场景:
我有一个线程在无限循环中运行,从第三方系统读取数据。在此线程中,我将任务提交到 ForkJoinPool
新线程() { 公共无效运行(){ 而(真){ ForkJoinTask
;无用返回= ForkJoinPool.submit(RecursiveActionTask); } } }
我正在使用 RecursiveAction,并且在少数情况下使用 RecursiveTask。这些任务使用submit()
方法提交到FJPool。 我想要一个类似于 UncaughtExceptionHandler 的通用异常处理程序,如果任务抛出未检查/未捕获的异常,我可以处理该异常并在需要时重新提交任务。处理异常还可以确保如果一个/某些任务抛出异常,排队的任务不会被取消。
invokeAll()
方法返回一组 ForkJoinTask,但这些任务位于递归块中(每个任务调用 compute()
方法,并且可以进一步拆分[假设场景])
class RecursiveActionTask extends RecursiveAction {
public void compute() {
if <task.size() <= ACCEPTABLE_SIZE) {
processTask() // this might throw an checked/unchecked exception
} else {
RecursiveActionTask[] splitTasks = splitTasks(tasks)
RecursiveActionTasks returnedTasks = invokeAll(splitTasks);
// the below code never executes as invokeAll submits the tasks to the pool
// and the flow never comes to the code below.
// I am looking for some handling like this
for (RecusiveActionTask task : returnedTasks) {
if (task.isDone()) {
task.getException() // handle this exception
}
}
}
}
}
我注意到,当 3-4 个任务失败时,整个队列提交单元都会被丢弃。目前,我在 processTask 周围放置了一个我个人不喜欢的 try/catch
。我正在寻找更通用的。
- 我还想知道所有失败的任务列表,以便我可以重新提交它们
- 当任务抛出异常时,线程是否会从池中逐出(尽管我的分析发现它们不会[但不确定])?
- 在 FutureTask 上调用 get() 方法更有可能使我的流程按顺序排列,因为它会等待任务完成。
- 我只想知道任务失败时的状态。我不在乎它何时完成(显然不想等一个小时后)
有什么想法如何处理上述场景中的异常吗?
What is the better way to handle exceptions(uncaught) while using ForkJoinPool
to submit tasks (RecursiveAction
or RecursiveTask
)?
ForkJoinPool accepts a Thread.UncaughtExceptionHandler
to handle exceptions when the WorkerThread terminates abruptly(which is anyways not under our control) but this handler is not used when ForkJoinTask
throws an exception. I am using the standard submit
/invokeAll
way in my implementation.
Here is my scenario:
I have a Thread running in a infinite loop reading data from a 3rd party system. With in this Thread I submit Tasks to the ForkJoinPool
new Thread() { public void run() { while (true) { ForkJoinTask<Void> uselessReturn = ForkJoinPool.submit(RecursiveActionTask); } } }
I am using a RecursiveAction and in few scenarios a RecursiveTask. These tasks are submitted to FJPool using submit()
method.
I want to have a generic exception handler similar to UncaughtExceptionHandler
where if a Task throws an unchecked/uncaught exception I can process the exception and re-submit the task if required. Handling the exception also ensures the queued tasks would not get cancelled if one/some of the Tasks throw an exception.
invokeAll()
method returns a set of ForkJoinTasks but these Tasks are in a recursive block (each task invokes the compute()
method and may be split further [hypothetical scenario] )
class RecursiveActionTask extends RecursiveAction {
public void compute() {
if <task.size() <= ACCEPTABLE_SIZE) {
processTask() // this might throw an checked/unchecked exception
} else {
RecursiveActionTask[] splitTasks = splitTasks(tasks)
RecursiveActionTasks returnedTasks = invokeAll(splitTasks);
// the below code never executes as invokeAll submits the tasks to the pool
// and the flow never comes to the code below.
// I am looking for some handling like this
for (RecusiveActionTask task : returnedTasks) {
if (task.isDone()) {
task.getException() // handle this exception
}
}
}
}
}
I noticed that when 3-4 tasks fail the whole queue submission unit is discarded. Currently I have put a try/catch
around processTask that I personally don't like. I am looking for more generic.
- I also want to know about all the list of tasks that failed so that I can re-submit them
- When the tasks throw exceptions do the threads get evicted from the pool (although my analysis found they doesn't [but not sure] )?
- Calling
get()
method on the FutureTask would more likely put my flow sequential as it waits until the task completes. - I want to know the status of the Task only if it fails. I don't care when it completes (obviously doesn't want to wait an hour later)
Any ideas how to handle the exceptions in the above scenario?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
这就是我们在 Akka 中解决这个问题的方法:
This is how we solved it in Akka:
@Rajendra,你做的一切都是正确的,除了你应该使用 ForkJoinPoolexecute() 而不是submit()。
这样,如果 Runnable 失败,它将强制产生一个工作异常,并且它将被 UncaughtExceptionHandler 捕获。
我不知道为什么这种行为存在,但它会起作用!我经历了惨痛的教训 :(
从 Java 8 代码中获取:
提交正在使用 AdaptedRunnableAction()。
执行正在使用 RunnableExecuteAction()(请参阅rethrow(ex))。
@Rajendra, You are doing everything right, except you should use ForkJoinPool execute() instead of submit().
This way if the Runnable fails it will force a worker exception and it will be caught by your UncaughtExceptionHandler.
I don't know why this behavior exist but it will work! I've learned it the hard way :(
Taking from Java 8 code:
submit is using AdaptedRunnableAction().
execute is using RunnableExecuteAction() (see the rethrow(ex)).