与 MailboxProcessor 和任务的交互永远挂起

发布于 2025-01-13 17:16:18 字数 2504 浏览 4 评论 0原文

我想按顺序处理一系列作业,但我想并行对这些作业进行排队。

这是我的代码:

open System.Threading.Tasks

let performWork (work : int) =
  task {
    do! Task.Delay 1000

    if work = 7 then
      failwith "Oh no"
    else
      printfn $"Work {work}"
  }

async {
  let w = MailboxProcessor.Start (fun inbox -> async {
    while true do
      let! message = inbox.Receive()

      let (ch : AsyncReplyChannel<_>), work = message

      do!
        performWork work
        |> Async.AwaitTask

      ch.Reply()
  })

  w.Error.Add(fun exn -> raise exn)

  let! completed =
    seq {
      for i = 1 to 10 do
        async {
          do! Async.Sleep 100
          do! w.PostAndAsyncReply(fun ch -> ch, i)

          return i
        }
    }
    |> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)

  printfn $"Completed {Seq.length completed} job(s)."
}
|> Async.RunSynchronously

我预计此代码一旦到达工作项 7 就会崩溃。

但是,它永远挂起:

$ dotnet fsi ./Test.fsx
Work 3
Work 1
Work 2
Work 4
Work 5
Work 6

我认为 w.Error 事件未正确触发。

我应该如何捕获并重新抛出此错误?

如果我的工作是异步的,那么它会按预期崩溃:

let performWork (work : int) =
  async {
    do! Async.Sleep 1000

    if work = 7 then
      failwith "Oh no"
    else
      printfn $"Work {work}"
  }

但我不明白为什么这很重要。


利用 Result 也可以,但同样,我不知道为什么需要这样做。

async {
  let w = MailboxProcessor.Start (fun inbox -> async {
    while true do
      let! message = inbox.Receive()

      let (ch : AsyncReplyChannel<_>), work = message

      try
        do!
          performWork work
          |> Async.AwaitTask

        ch.Reply(Ok ())
      with exn ->
        ch.Reply(Error exn)
  })

  let performWorkOnWorker (work : int) =
    async {
      let! outcome = w.PostAndAsyncReply(fun ch -> ch, work)

      match outcome with
      | Ok () ->
        return ()
      | Error exn ->
        return raise exn
    }

  let! completed =
    seq {
      for i = 1 to 10 do
        async {
          do! Async.Sleep 100
          do! performWorkOnWorker i

          return i
        }
    }
    |> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)

  printfn $"Completed {Seq.length completed} job(s)."
}
|> Async.RunSynchronously

I want to process a series of jobs in sequence, but I want to queue up those jobs in parallel.

Here is my code:

open System.Threading.Tasks

let performWork (work : int) =
  task {
    do! Task.Delay 1000

    if work = 7 then
      failwith "Oh no"
    else
      printfn 
quot;Work {work}"
  }

async {
  let w = MailboxProcessor.Start (fun inbox -> async {
    while true do
      let! message = inbox.Receive()

      let (ch : AsyncReplyChannel<_>), work = message

      do!
        performWork work
        |> Async.AwaitTask

      ch.Reply()
  })

  w.Error.Add(fun exn -> raise exn)

  let! completed =
    seq {
      for i = 1 to 10 do
        async {
          do! Async.Sleep 100
          do! w.PostAndAsyncReply(fun ch -> ch, i)

          return i
        }
    }
    |> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)

  printfn 
quot;Completed {Seq.length completed} job(s)."
}
|> Async.RunSynchronously

I expect this code to crash once it reaches work item 7.

However, it hangs forever:

$ dotnet fsi ./Test.fsx
Work 3
Work 1
Work 2
Work 4
Work 5
Work 6

I think that the w.Error event is not firing correctly.

How should I be capturing and re-throwing this error?

If my work is async, then it crashes as expected:

let performWork (work : int) =
  async {
    do! Async.Sleep 1000

    if work = 7 then
      failwith "Oh no"
    else
      printfn 
quot;Work {work}"
  }

But I don't see why this should matter.


Leveraging a Result also works, but again, I don't know why this should be required.

async {
  let w = MailboxProcessor.Start (fun inbox -> async {
    while true do
      let! message = inbox.Receive()

      let (ch : AsyncReplyChannel<_>), work = message

      try
        do!
          performWork work
          |> Async.AwaitTask

        ch.Reply(Ok ())
      with exn ->
        ch.Reply(Error exn)
  })

  let performWorkOnWorker (work : int) =
    async {
      let! outcome = w.PostAndAsyncReply(fun ch -> ch, work)

      match outcome with
      | Ok () ->
        return ()
      | Error exn ->
        return raise exn
    }

  let! completed =
    seq {
      for i = 1 to 10 do
        async {
          do! Async.Sleep 100
          do! performWorkOnWorker i

          return i
        }
    }
    |> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)

  printfn 
quot;Completed {Seq.length completed} job(s)."
}
|> Async.RunSynchronously

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

念三年u 2025-01-20 17:16:18

我认为问题在于您的错误处理:

w.Error.Add(fun exn -> raise exn)

您没有处理异常,而是尝试再次引发它,我认为这导致了无限循环。

您可以更改此设置以打印异常:

w.Error.Add(printfn "%A")

结果是:

Work 4
Work 2
Work 1
Work 3
Work 5
Work 6
System.AggregateException: One or more errors occurred. (Oh no)
 ---> System.Exception: Oh no
   at [email protected]() in C:\Users\Brian Berns\Source\Repos\FsharpConsole\FsharpConsole\Program.fs:line 8
   --- End of inner exception stack trace ---

I think the problem is in your error handling:

w.Error.Add(fun exn -> raise exn)

Instead of handling the exception, you're attempting to raise it again, which I think is causing an infinite loop.

You can change this to print the exception instead:

w.Error.Add(printfn "%A")

Result is:

Work 4
Work 2
Work 1
Work 3
Work 5
Work 6
System.AggregateException: One or more errors occurred. (Oh no)
 ---> System.Exception: Oh no
   at [email protected]() in C:\Users\Brian Berns\Source\Repos\FsharpConsole\FsharpConsole\Program.fs:line 8
   --- End of inner exception stack trace ---
香橙ぽ 2025-01-20 17:16:18

我认为这里“原因”的要点是,微软在 .NET 4.5 中改变了“未观察到的”任务异常的行为,并且这被引入到 .NET Core 中:这些异常不再导致进程终止,它们“重新有效地忽略了。您可以阅读有关它的更多信息

我不知道 Taskasync 如何互操作的来龙去脉,但似乎使用 Task 会导致延续被附加到它并因此在 TaskScheduler 上运行。该异常是作为 MailboxProcessor 内的异步计算的一部分引发的,并且没有任何东西“观察”它。这意味着异常最终会出现在上述机制中,这就是您的进程不再崩溃的原因。

您可以通过 app.config 通过 .NET Framework 上的标志来更改此行为,如上面的链接中所述。对于.NET Core,你不能这样做。您通常会尝试通过订阅 UnobservedTaskException 事件并在那里重新抛出来尝试复制此过程,但在这种情况下这不起作用,因为 Task 已挂起并获胜永远不要被垃圾收集。

为了尝试证明这一点,我修改了您的示例以包含 PostAndReplyAsync 的超时。这意味着任务最终完成,可以进行垃圾收集,并且当终结器运行时,事件会被触发。

open System
open System.Threading.Tasks

let performWork (work : int) =
  task {
    do! Task.Delay 1000

    if work = 7 then
      failwith "Oh no"
    else
      printfn $"Work {work}"
  }

let worker = async {
  let w = MailboxProcessor.Start (fun inbox -> async {
    while true do
      let! message = inbox.Receive()

      let (ch : AsyncReplyChannel<_>), work = message

      do!
        performWork work
        |> Async.AwaitTask

      ch.Reply()
  })

  w.Error.Add(fun exn -> raise exn)

  let! completed =
    seq {
      for i = 1 to 10 do
        async {
          do! Async.Sleep 100
          do! w.PostAndAsyncReply((fun ch -> ch, i), 10000)

          return i
        }
    }
    |> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)

  printfn $"Completed {Seq.length completed} job(s)."

}

TaskScheduler.UnobservedTaskException.Add(fun ex ->
    printfn "UnobservedTaskException was fired, re-raising"
    raise ex.Exception)

try
  Async.RunSynchronously worker
with
  | :? TimeoutException -> ()

GC.Collect()
GC.WaitForPendingFinalizers()

我在这里得到的输出是:

Work 1
Work 3
Work 4
Work 2
Work 5
Work 6
UnobservedTaskException was fired, re-raising
Unhandled exception. System.AggregateException: A Task's exception(s) were not observed either by Waiting on the Task or accessing its Exception property. As a result, the unobserved exception was rethrown by the finalizer thread. (One or more errors occurred. (Oh no))
 ---> System.AggregateException: One or more errors occurred. (Oh no)
 ---> System.Exception: Oh no
   at [email protected]() in /Users/cmager/dev/ConsoleApp1/ConsoleApp2/Program.fs:line 9
   --- End of inner exception stack trace ---
   at [email protected](ExceptionDispatchInfo edi)
   at Microsoft.FSharp.Control.Trampoline.Execute(FSharpFunc`2 firstAction) in D:\a\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 104
   at Microsoft.FSharp.Control.AsyncPrimitives.AttachContinuationToTask@1144.Invoke(Task`1 completedTask) in D:\a\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 1145
   at System.Threading.Tasks.ContinuationTaskFromResultTask`1.InnerInvoke()
   at System.Threading.Tasks.Task.<>c.<.cctor>b__272_0(Object obj)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location ---
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)
   --- End of inner exception stack trace ---
   at [email protected](UnobservedTaskExceptionEventArgs ex) in /Users/cmager/dev/ConsoleApp1/ConsoleApp2/Program.fs:line 48
   at Microsoft.FSharp.Control.CommonExtensions.SubscribeToObservable@1989.System.IObserver<'T>.OnNext(T args) in D:\a\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 1990
   at Microsoft.FSharp.Core.CompilerServices.RuntimeHelpers.h@379.Invoke(Object _arg1, TArgs args) in D:\a\_work\1\s\src\fsharp\FSharp.Core\seqcore.fs:line 379
   at [email protected](Object delegateArg0, UnobservedTaskExceptionEventArgs delegateArg1) in /Users/cmager/dev/ConsoleApp1/ConsoleApp2/Program.fs:line 46
   at System.Threading.Tasks.TaskScheduler.PublishUnobservedTaskException(Object sender, UnobservedTaskExceptionEventArgs ueea)
   at System.Threading.Tasks.TaskExceptionHolder.Finalize()

如您所见,异常最终由任务终结器发布,并将其重新抛出到该处理程序中会导致应用程序关闭。

虽然很有趣,但我不确定这些信息是否有实际用途。在 MailboxProcessor.Error 处理程序中终止应用程序的建议可能是正确的。

I think the gist of the 'why' here is that Microsoft changed the behaviour for 'unobserved' task exceptions back in .NET 4.5, and this was brought through into .NET Core: these exceptions no longer cause the process to terminate, they're effectively ignored. You can read more about it here.

I don't know the ins and outs of how Task and async are interoperating, but it would seem that the use of Task results in the continuations being attached to that and run on the TaskScheduler as a consequence. The exception is thrown as part of the async computation within the MailboxProcessor, and nothing is 'observing' it. This means the exception ends up in the mechanism referred to above, and that's why your process no longer crashes.

You can change this behaviour via a flag on .NET Framework via app.config, as explained in the link above. For .NET Core, you can't do this. You'd ordinarily try and replicate this by subscribing to the UnobservedTaskException event and re-throwing there, but that won't work in this case as the Task is hung and won't ever be garbage collected.

To try and prove the point, I've amended your example to include a timeout for PostAndReplyAsync. This means that the Task will eventually complete, can be garbage collected and, when the finaliser runs, the event fired.

open System
open System.Threading.Tasks

let performWork (work : int) =
  task {
    do! Task.Delay 1000

    if work = 7 then
      failwith "Oh no"
    else
      printfn 
quot;Work {work}"
  }

let worker = async {
  let w = MailboxProcessor.Start (fun inbox -> async {
    while true do
      let! message = inbox.Receive()

      let (ch : AsyncReplyChannel<_>), work = message

      do!
        performWork work
        |> Async.AwaitTask

      ch.Reply()
  })

  w.Error.Add(fun exn -> raise exn)

  let! completed =
    seq {
      for i = 1 to 10 do
        async {
          do! Async.Sleep 100
          do! w.PostAndAsyncReply((fun ch -> ch, i), 10000)

          return i
        }
    }
    |> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)

  printfn 
quot;Completed {Seq.length completed} job(s)."

}

TaskScheduler.UnobservedTaskException.Add(fun ex ->
    printfn "UnobservedTaskException was fired, re-raising"
    raise ex.Exception)

try
  Async.RunSynchronously worker
with
  | :? TimeoutException -> ()

GC.Collect()
GC.WaitForPendingFinalizers()

The output I get here is:

Work 1
Work 3
Work 4
Work 2
Work 5
Work 6
UnobservedTaskException was fired, re-raising
Unhandled exception. System.AggregateException: A Task's exception(s) were not observed either by Waiting on the Task or accessing its Exception property. As a result, the unobserved exception was rethrown by the finalizer thread. (One or more errors occurred. (Oh no))
 ---> System.AggregateException: One or more errors occurred. (Oh no)
 ---> System.Exception: Oh no
   at [email protected]() in /Users/cmager/dev/ConsoleApp1/ConsoleApp2/Program.fs:line 9
   --- End of inner exception stack trace ---
   at [email protected](ExceptionDispatchInfo edi)
   at Microsoft.FSharp.Control.Trampoline.Execute(FSharpFunc`2 firstAction) in D:\a\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 104
   at Microsoft.FSharp.Control.AsyncPrimitives.AttachContinuationToTask@1144.Invoke(Task`1 completedTask) in D:\a\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 1145
   at System.Threading.Tasks.ContinuationTaskFromResultTask`1.InnerInvoke()
   at System.Threading.Tasks.Task.<>c.<.cctor>b__272_0(Object obj)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location ---
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)
   --- End of inner exception stack trace ---
   at [email protected](UnobservedTaskExceptionEventArgs ex) in /Users/cmager/dev/ConsoleApp1/ConsoleApp2/Program.fs:line 48
   at Microsoft.FSharp.Control.CommonExtensions.SubscribeToObservable@1989.System.IObserver<'T>.OnNext(T args) in D:\a\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 1990
   at Microsoft.FSharp.Core.CompilerServices.RuntimeHelpers.h@379.Invoke(Object _arg1, TArgs args) in D:\a\_work\1\s\src\fsharp\FSharp.Core\seqcore.fs:line 379
   at [email protected](Object delegateArg0, UnobservedTaskExceptionEventArgs delegateArg1) in /Users/cmager/dev/ConsoleApp1/ConsoleApp2/Program.fs:line 46
   at System.Threading.Tasks.TaskScheduler.PublishUnobservedTaskException(Object sender, UnobservedTaskExceptionEventArgs ueea)
   at System.Threading.Tasks.TaskExceptionHolder.Finalize()

As you can see, the exception is eventually published by the Task finaliser, and re-throwing it in that handler brings down the app.

While interesting, I'm not sure any of this is practically useful information. The suggestion to terminate the app within MailboxProcessor.Error handler is probably the right one.

时光磨忆 2025-01-20 17:16:18

据我所知,当您在 MailboxProcessor 主体中引发异常时。然后 MailboxProcessor 不会永远挂起,它只是停止整个 MailboxProcessor。

您的程序也会挂起,因为您执行了 Async.Parallel 并等待每个异步完成。但那些有异常的人永远不会完成或返回结果。所以你的程序总体上永远挂起。

如果您想显式中止,那么您需要调用 System.Environment.Exit,而不仅仅是抛出异常。

重写程序的一种方法是这样的。

open System.Threading.Tasks

let performWork (work : int) = task {
    do! Task.Delay 1000

    if   work = 7
    then failwith "Oh no"
    else printfn $"Work {work}"
}

let mb =
    let mbBody (inbox : MailboxProcessor<AsyncReplyChannel<_> * int>) = async {
        while true do
            let! (ch,work) = inbox.Receive()
            try
                do! performWork work |> Async.AwaitTask
                ch.Reply ()
            with error  ->
                System.Environment.Exit 0
    }
    MailboxProcessor.Start mbBody

Async.RunSynchronously (async {
    let! completed =
        let jobs = [|
            for i = 1 to 10 do
                async {
                    do! Async.Sleep 100
                    do! mb.PostAndAsyncReply(fun ch -> ch, i)
                    return i
                }
        |]
        Async.Parallel(jobs)

    printfn $"Completed {Seq.length completed} job(s)."
})

顺便提一句。我将 seq {} 更改为数组,并另外删除了 maxDegreeOfParallelism 选项。否则,在我的测试中,结果似乎不太平行。但如果您愿意,您仍然可以保留它们。

执行该程序会打印如下内容:

Work 10
Work 4
Work 9
Work 3
Work 8

As far as I see, when you throw an exception in the MailboxProcessor Body. Then the MailboxProcessor doesn't hang forever, it just stops the whole MailboxProcessor.

Your program also hangs, well because you do a Async.Parallel and wait until every async finished. But those with an exception, never finish, or returns a result. So your program overall, hangs forever.

If you want to explicitly abort, then you need to call System.Environment.Exit, not just throw an exception.

One way to re-write your program is like this.

open System.Threading.Tasks

let performWork (work : int) = task {
    do! Task.Delay 1000

    if   work = 7
    then failwith "Oh no"
    else printfn 
quot;Work {work}"
}

let mb =
    let mbBody (inbox : MailboxProcessor<AsyncReplyChannel<_> * int>) = async {
        while true do
            let! (ch,work) = inbox.Receive()
            try
                do! performWork work |> Async.AwaitTask
                ch.Reply ()
            with error  ->
                System.Environment.Exit 0
    }
    MailboxProcessor.Start mbBody

Async.RunSynchronously (async {
    let! completed =
        let jobs = [|
            for i = 1 to 10 do
                async {
                    do! Async.Sleep 100
                    do! mb.PostAndAsyncReply(fun ch -> ch, i)
                    return i
                }
        |]
        Async.Parallel(jobs)

    printfn 
quot;Completed {Seq.length completed} job(s)."
})

Btw. i changed the seq {} to an array, and additional removed the maxDegreeOfParallelism option. Otherwise the results seemed not to be very parallel in my tests. But you still can keep those if you want.

executing this program prints something like:

Work 10
Work 4
Work 9
Work 3
Work 8
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文