与 MailboxProcessor 和任务的交互永远挂起
我想按顺序处理一系列作业,但我想并行对这些作业进行排队。
这是我的代码:
open System.Threading.Tasks
let performWork (work : int) =
task {
do! Task.Delay 1000
if work = 7 then
failwith "Oh no"
else
printfn $"Work {work}"
}
async {
let w = MailboxProcessor.Start (fun inbox -> async {
while true do
let! message = inbox.Receive()
let (ch : AsyncReplyChannel<_>), work = message
do!
performWork work
|> Async.AwaitTask
ch.Reply()
})
w.Error.Add(fun exn -> raise exn)
let! completed =
seq {
for i = 1 to 10 do
async {
do! Async.Sleep 100
do! w.PostAndAsyncReply(fun ch -> ch, i)
return i
}
}
|> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)
printfn $"Completed {Seq.length completed} job(s)."
}
|> Async.RunSynchronously
我预计此代码一旦到达工作项 7
就会崩溃。
但是,它永远挂起:
$ dotnet fsi ./Test.fsx
Work 3
Work 1
Work 2
Work 4
Work 5
Work 6
我认为 w.Error
事件未正确触发。
我应该如何捕获并重新抛出此错误?
如果我的工作是异步的,那么它会按预期崩溃:
let performWork (work : int) =
async {
do! Async.Sleep 1000
if work = 7 then
failwith "Oh no"
else
printfn $"Work {work}"
}
但我不明白为什么这很重要。
利用 Result
也可以,但同样,我不知道为什么需要这样做。
async {
let w = MailboxProcessor.Start (fun inbox -> async {
while true do
let! message = inbox.Receive()
let (ch : AsyncReplyChannel<_>), work = message
try
do!
performWork work
|> Async.AwaitTask
ch.Reply(Ok ())
with exn ->
ch.Reply(Error exn)
})
let performWorkOnWorker (work : int) =
async {
let! outcome = w.PostAndAsyncReply(fun ch -> ch, work)
match outcome with
| Ok () ->
return ()
| Error exn ->
return raise exn
}
let! completed =
seq {
for i = 1 to 10 do
async {
do! Async.Sleep 100
do! performWorkOnWorker i
return i
}
}
|> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)
printfn $"Completed {Seq.length completed} job(s)."
}
|> Async.RunSynchronously
I want to process a series of jobs in sequence, but I want to queue up those jobs in parallel.
Here is my code:
open System.Threading.Tasks
let performWork (work : int) =
task {
do! Task.Delay 1000
if work = 7 then
failwith "Oh no"
else
printfn quot;Work {work}"
}
async {
let w = MailboxProcessor.Start (fun inbox -> async {
while true do
let! message = inbox.Receive()
let (ch : AsyncReplyChannel<_>), work = message
do!
performWork work
|> Async.AwaitTask
ch.Reply()
})
w.Error.Add(fun exn -> raise exn)
let! completed =
seq {
for i = 1 to 10 do
async {
do! Async.Sleep 100
do! w.PostAndAsyncReply(fun ch -> ch, i)
return i
}
}
|> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)
printfn quot;Completed {Seq.length completed} job(s)."
}
|> Async.RunSynchronously
I expect this code to crash once it reaches work item 7
.
However, it hangs forever:
$ dotnet fsi ./Test.fsx
Work 3
Work 1
Work 2
Work 4
Work 5
Work 6
I think that the w.Error
event is not firing correctly.
How should I be capturing and re-throwing this error?
If my work is async
, then it crashes as expected:
let performWork (work : int) =
async {
do! Async.Sleep 1000
if work = 7 then
failwith "Oh no"
else
printfn quot;Work {work}"
}
But I don't see why this should matter.
Leveraging a Result
also works, but again, I don't know why this should be required.
async {
let w = MailboxProcessor.Start (fun inbox -> async {
while true do
let! message = inbox.Receive()
let (ch : AsyncReplyChannel<_>), work = message
try
do!
performWork work
|> Async.AwaitTask
ch.Reply(Ok ())
with exn ->
ch.Reply(Error exn)
})
let performWorkOnWorker (work : int) =
async {
let! outcome = w.PostAndAsyncReply(fun ch -> ch, work)
match outcome with
| Ok () ->
return ()
| Error exn ->
return raise exn
}
let! completed =
seq {
for i = 1 to 10 do
async {
do! Async.Sleep 100
do! performWorkOnWorker i
return i
}
}
|> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)
printfn quot;Completed {Seq.length completed} job(s)."
}
|> Async.RunSynchronously
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
我认为问题在于您的错误处理:
您没有处理异常,而是尝试再次引发它,我认为这导致了无限循环。
您可以更改此设置以打印异常:
结果是:
I think the problem is in your error handling:
Instead of handling the exception, you're attempting to raise it again, which I think is causing an infinite loop.
You can change this to print the exception instead:
Result is:
我认为这里“原因”的要点是,微软在 .NET 4.5 中改变了“未观察到的”任务异常的行为,并且这被引入到 .NET Core 中:这些异常不再导致进程终止,它们“重新有效地忽略了。您可以阅读有关它的更多信息
我不知道
Task
和async
如何互操作的来龙去脉,但似乎使用Task
会导致延续被附加到它并因此在TaskScheduler
上运行。该异常是作为 MailboxProcessor 内的异步计算的一部分引发的,并且没有任何东西“观察”它。这意味着异常最终会出现在上述机制中,这就是您的进程不再崩溃的原因。您可以通过
app.config
通过 .NET Framework 上的标志来更改此行为,如上面的链接中所述。对于.NET Core,你不能这样做。您通常会尝试通过订阅UnobservedTaskException
事件并在那里重新抛出来尝试复制此过程,但在这种情况下这不起作用,因为Task
已挂起并获胜永远不要被垃圾收集。为了尝试证明这一点,我修改了您的示例以包含
PostAndReplyAsync
的超时。这意味着任务
将最终完成,可以进行垃圾收集,并且当终结器运行时,事件会被触发。我在这里得到的输出是:
如您所见,异常最终由任务终结器发布,并将其重新抛出到该处理程序中会导致应用程序关闭。
虽然很有趣,但我不确定这些信息是否有实际用途。在 MailboxProcessor.Error 处理程序中终止应用程序的建议可能是正确的。
I think the gist of the 'why' here is that Microsoft changed the behaviour for 'unobserved' task exceptions back in .NET 4.5, and this was brought through into .NET Core: these exceptions no longer cause the process to terminate, they're effectively ignored. You can read more about it here.
I don't know the ins and outs of how
Task
andasync
are interoperating, but it would seem that the use ofTask
results in the continuations being attached to that and run on theTaskScheduler
as a consequence. The exception is thrown as part of theasync
computation within theMailboxProcessor
, and nothing is 'observing' it. This means the exception ends up in the mechanism referred to above, and that's why your process no longer crashes.You can change this behaviour via a flag on .NET Framework via
app.config
, as explained in the link above. For .NET Core, you can't do this. You'd ordinarily try and replicate this by subscribing to theUnobservedTaskException
event and re-throwing there, but that won't work in this case as theTask
is hung and won't ever be garbage collected.To try and prove the point, I've amended your example to include a timeout for
PostAndReplyAsync
. This means that theTask
will eventually complete, can be garbage collected and, when the finaliser runs, the event fired.The output I get here is:
As you can see, the exception is eventually published by the
Task
finaliser, and re-throwing it in that handler brings down the app.While interesting, I'm not sure any of this is practically useful information. The suggestion to terminate the app within
MailboxProcessor.Error
handler is probably the right one.据我所知,当您在 MailboxProcessor 主体中引发异常时。然后 MailboxProcessor 不会永远挂起,它只是停止整个 MailboxProcessor。
您的程序也会挂起,因为您执行了 Async.Parallel 并等待每个异步完成。但那些有异常的人永远不会完成或返回结果。所以你的程序总体上永远挂起。
如果您想显式中止,那么您需要调用 System.Environment.Exit,而不仅仅是抛出异常。
重写程序的一种方法是这样的。
顺便提一句。我将
seq {}
更改为数组,并另外删除了maxDegreeOfParallelism
选项。否则,在我的测试中,结果似乎不太平行。但如果您愿意,您仍然可以保留它们。执行该程序会打印如下内容:
As far as I see, when you throw an exception in the MailboxProcessor Body. Then the MailboxProcessor doesn't hang forever, it just stops the whole MailboxProcessor.
Your program also hangs, well because you do a
Async.Parallel
and wait until every async finished. But those with an exception, never finish, or returns a result. So your program overall, hangs forever.If you want to explicitly abort, then you need to call
System.Environment.Exit
, not just throw an exception.One way to re-write your program is like this.
Btw. i changed the
seq {}
to an array, and additional removed themaxDegreeOfParallelism
option. Otherwise the results seemed not to be very parallel in my tests. But you still can keep those if you want.executing this program prints something like: