处理 ForkJoinPool 任务/操作中未捕获异常的更好方法

发布于 2025-01-01 18:02:28 字数 2739 浏览 1 评论 0原文

使用 ForkJoinPool 提交任务(RecursiveActionRecursiveTask)?

ForkJoinPool 接受 Thread.UncaughtExceptionHandler来处理 WorkerThread 突然终止时的异常(无论如何,这不在我们的控制之下),但是当 ForkJoinTask 抛出一个例外。我正在使用标准 提交/在我的实现中调用All 方式。

这是我的场景:

我有一个线程在无限循环中运行,从第三方系统读取数据。在此线程中,我将任务提交到 ForkJoinPool

新线程() {
      公共无效运行(){
         而(真){
             ForkJoinTask;无用返回= 
                   ForkJoinPool.submit(RecursiveActionTask);
         }
      }
 }

我正在使用 RecursiveAction,并且在少数情况下使用 RecursiveTask。这些任务使用submit()方法提交到FJPool。 我想要一个类似于 UncaughtExceptionHandler 的通用异常处理程序,如果任务抛出未检查/未捕获的异常,我可以处理该异常并在需要时重新提交任务。处理异常还可以确保如果一个/某些任务抛出异常,排队的任务不会被取消。

invokeAll() 方法返回一组 ForkJoinTask,但这些任务位于递归块中(每个任务调用 compute() 方法,并且可以进一步拆分[假设场景])

class RecursiveActionTask extends RecursiveAction {

    public void compute() {
       if <task.size() <= ACCEPTABLE_SIZE) {
          processTask() // this might throw an checked/unchecked exception
       } else {
          RecursiveActionTask[] splitTasks = splitTasks(tasks)
          RecursiveActionTasks returnedTasks = invokeAll(splitTasks);
          // the below code never executes as invokeAll submits the tasks to the pool 
          // and the flow never comes to the code below.
          // I am looking for some handling like this
          for (RecusiveActionTask task : returnedTasks) {
             if (task.isDone()) {
                task.getException() // handle this exception
             }
          }
       }
    }

}

我注意到,当 3-4 个任务失败时,整个队列提交单元都会被丢弃。目前,我在 processTask 周围放置了一个我个人不喜欢的 try/catch 。我正在寻找更通用的。

  1. 我还想知道所有失败的任务列表,以便我可以重新提交它们
  2. 当任务抛出异常时,线程是否会从池中逐出(尽管我的分析发现它们不会[但不确定])?
  3. 在 FutureTask 上调用 get() 方法更有可能使我的流程按顺序排列,因为它会等待任务完成。
  4. 我只想知道任务失败时的状态。我不在乎它何时完成(显然不想等一个小时后)

有什么想法如何处理上述场景中的异常吗?

What is the better way to handle exceptions(uncaught) while using ForkJoinPool to submit tasks (RecursiveAction or RecursiveTask)?

ForkJoinPool accepts a Thread.UncaughtExceptionHandler to handle exceptions when the WorkerThread terminates abruptly(which is anyways not under our control) but this handler is not used when ForkJoinTask throws an exception. I am using the standard submit/invokeAll way in my implementation.

Here is my scenario:

I have a Thread running in a infinite loop reading data from a 3rd party system. With in this Thread I submit Tasks to the ForkJoinPool

new Thread() {
      public void run() {
         while (true) {
             ForkJoinTask<Void> uselessReturn = 
                   ForkJoinPool.submit(RecursiveActionTask);
         }
      }
 }

I am using a RecursiveAction and in few scenarios a RecursiveTask. These tasks are submitted to FJPool using submit() method.
I want to have a generic exception handler similar to UncaughtExceptionHandler where if a Task throws an unchecked/uncaught exception I can process the exception and re-submit the task if required. Handling the exception also ensures the queued tasks would not get cancelled if one/some of the Tasks throw an exception.

invokeAll() method returns a set of ForkJoinTasks but these Tasks are in a recursive block (each task invokes the compute() method and may be split further [hypothetical scenario] )

class RecursiveActionTask extends RecursiveAction {

    public void compute() {
       if <task.size() <= ACCEPTABLE_SIZE) {
          processTask() // this might throw an checked/unchecked exception
       } else {
          RecursiveActionTask[] splitTasks = splitTasks(tasks)
          RecursiveActionTasks returnedTasks = invokeAll(splitTasks);
          // the below code never executes as invokeAll submits the tasks to the pool 
          // and the flow never comes to the code below.
          // I am looking for some handling like this
          for (RecusiveActionTask task : returnedTasks) {
             if (task.isDone()) {
                task.getException() // handle this exception
             }
          }
       }
    }

}

I noticed that when 3-4 tasks fail the whole queue submission unit is discarded. Currently I have put a try/catch around processTask that I personally don't like. I am looking for more generic.

  1. I also want to know about all the list of tasks that failed so that I can re-submit them
  2. When the tasks throw exceptions do the threads get evicted from the pool (although my analysis found they doesn't [but not sure] )?
  3. Calling get() method on the FutureTask would more likely put my flow sequential as it waits until the task completes.
  4. I want to know the status of the Task only if it fails. I don't care when it completes (obviously doesn't want to wait an hour later)

Any ideas how to handle the exceptions in the above scenario?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

一个人的夜不怕黑 2025-01-08 18:02:28

这就是我们在 Akka 中解决这个问题的方法:

/**
 * INTERNAL AKKA USAGE ONLY
 */
final class MailboxExecutionTask(mailbox: Mailbox) extends ForkJoinTask[Unit] {
  final override def setRawResult(u: Unit): Unit = ()
  final override def getRawResult(): Unit = ()
  final override def exec(): Boolean = try { mailbox.run; true } catch {
    case anything ⇒
      val t = Thread.currentThread
      t.getUncaughtExceptionHandler match {
        case null ⇒
        case some ⇒ some.uncaughtException(t, anything)
      }
      throw anything
  }
 }

This is how we solved it in Akka:

/**
 * INTERNAL AKKA USAGE ONLY
 */
final class MailboxExecutionTask(mailbox: Mailbox) extends ForkJoinTask[Unit] {
  final override def setRawResult(u: Unit): Unit = ()
  final override def getRawResult(): Unit = ()
  final override def exec(): Boolean = try { mailbox.run; true } catch {
    case anything ⇒
      val t = Thread.currentThread
      t.getUncaughtExceptionHandler match {
        case null ⇒
        case some ⇒ some.uncaughtException(t, anything)
      }
      throw anything
  }
 }
じ违心 2025-01-08 18:02:28

@Rajendra,你做的一切都是正确的,除了你应该使用 ForkJoinPoolexecute() 而不是submit()。

这样,如果 Runnable 失败,它将强制产生一个工作异常,并且它将被 UncaughtExceptionHandler 捕获。

我不知道为什么这种行为存在,但它会起作用!我经历了惨痛的教训 :(

从 Java 8 代码中获取:
提交正在使用 AdaptedRunnableAction()。
执行正在使用 RunnableExecuteAction()(请参阅rethrow(ex))。

 /**
 * Adaptor for Runnables without results
 */
static final class AdaptedRunnableAction extends ForkJoinTask<Void>
    implements RunnableFuture<Void> {
    final Runnable runnable;
    AdaptedRunnableAction(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    public final void run() { invoke(); }
    private static final long serialVersionUID = 5232453952276885070L;
}

/**
 * Adaptor for Runnables in which failure forces worker exception
 */
static final class RunnableExecuteAction extends ForkJoinTask<Void> {
    final Runnable runnable;
    RunnableExecuteAction(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    void internalPropagateException(Throwable ex) {
        rethrow(ex); // rethrow outside exec() catches.
    }
    private static final long serialVersionUID = 5232453952276885070L;
}

@Rajendra, You are doing everything right, except you should use ForkJoinPool execute() instead of submit().

This way if the Runnable fails it will force a worker exception and it will be caught by your UncaughtExceptionHandler.

I don't know why this behavior exist but it will work! I've learned it the hard way :(

Taking from Java 8 code:
submit is using AdaptedRunnableAction().
execute is using RunnableExecuteAction() (see the rethrow(ex)).

 /**
 * Adaptor for Runnables without results
 */
static final class AdaptedRunnableAction extends ForkJoinTask<Void>
    implements RunnableFuture<Void> {
    final Runnable runnable;
    AdaptedRunnableAction(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    public final void run() { invoke(); }
    private static final long serialVersionUID = 5232453952276885070L;
}

/**
 * Adaptor for Runnables in which failure forces worker exception
 */
static final class RunnableExecuteAction extends ForkJoinTask<Void> {
    final Runnable runnable;
    RunnableExecuteAction(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    void internalPropagateException(Throwable ex) {
        rethrow(ex); // rethrow outside exec() catches.
    }
    private static final long serialVersionUID = 5232453952276885070L;
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文