F# 异步堆栈溢出
我对基于异步的程序中的堆栈溢出感到惊讶。我怀疑主要问题在于以下函数,该函数应该组成两个异步计算以并行执行并等待两者完成:
let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
async {
let! x = Async.StartChild a
let! y = Async.StartChild b
do! x
do! y
}
定义了此函数后,我有以下 mapReduce
程序,该程序尝试利用 map
和 reduce
部分中的并行性。通俗地说,其想法是使用共享通道触发 N
个映射器和 N-1
化简器,等待它们完成,然后从通道读取结果。我有自己的 Channel
实现,这里用 ConcurrentBag
替换以缩短代码(该问题影响两者):
let mapReduce (map : 'T1 -> Async<'T2>)
(reduce : 'T2 -> 'T2 -> Async<'T2>)
(input : seq<'T1>) : Async<'T2> =
let bag = System.Collections.Concurrent.ConcurrentBag()
let rec read () =
async {
match bag.TryTake() with
| true, value -> return value
| _ -> do! Async.Sleep 100
return! read ()
}
let write x =
bag.Add x
async.Return ()
let reducer =
async {
let! x = read ()
let! y = read ()
let! r = reduce x y
return bag.Add r
}
let work =
input
|> Seq.map (fun x -> async.Bind(map x, write))
|> Seq.reduce (fun m1 m2 -> m1 <|> m2 <|> reducer)
async {
do! work
return! read ()
}
现在,以下基本测试开始在 n=10000 上抛出 StackOverflowException:
let test n =
let map x = async.Return x
let reduce x y = async.Return (x + y)
mapReduce map reduce [0..n]
|> Async.RunSynchronously
编辑: <|>
组合器的另一种实现使测试在 N=10000 上成功:
let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
Async.FromContinuations(fun (ok, _, _) ->
let count = ref 0
let ok () =
lock count (fun () ->
match !count with
| 0 -> incr count
| _ -> ok ())
Async.Start <|
async {
do! a
return ok ()
}
Async.Start <|
async {
do! b
return ok ()
})
这对我来说真的很惊讶,因为这是我的假设Async.StartChild
正在做。关于哪种解决方案是最佳的有什么想法吗?
I am surprised by a stack overflow in my async-based program. I suspect the main problem is with the following function, which is supposed to compose two async computations to execute in parallel and wait for both to finish:
let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
async {
let! x = Async.StartChild a
let! y = Async.StartChild b
do! x
do! y
}
With this defined, I have the following mapReduce
program that attempts to exploit parallelism in both the map
and the reduce
part. Informally, the idea is to spark N
mappers and N-1
reducers using a shared channel, wait for them to finish, and read the result from the channel. I had my own Channel
implementation, here replaced by a ConcurrentBag
for shorter code (the problem affects both):
let mapReduce (map : 'T1 -> Async<'T2>)
(reduce : 'T2 -> 'T2 -> Async<'T2>)
(input : seq<'T1>) : Async<'T2> =
let bag = System.Collections.Concurrent.ConcurrentBag()
let rec read () =
async {
match bag.TryTake() with
| true, value -> return value
| _ -> do! Async.Sleep 100
return! read ()
}
let write x =
bag.Add x
async.Return ()
let reducer =
async {
let! x = read ()
let! y = read ()
let! r = reduce x y
return bag.Add r
}
let work =
input
|> Seq.map (fun x -> async.Bind(map x, write))
|> Seq.reduce (fun m1 m2 -> m1 <|> m2 <|> reducer)
async {
do! work
return! read ()
}
Now the following basic test starts to throw StackOverflowException on n=10000:
let test n =
let map x = async.Return x
let reduce x y = async.Return (x + y)
mapReduce map reduce [0..n]
|> Async.RunSynchronously
EDIT: An alternative implementation of the <|>
combinator makes the test succeed on N=10000:
let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
Async.FromContinuations(fun (ok, _, _) ->
let count = ref 0
let ok () =
lock count (fun () ->
match !count with
| 0 -> incr count
| _ -> ok ())
Async.Start <|
async {
do! a
return ok ()
}
Async.Start <|
async {
do! b
return ok ()
})
This is really surprising to me because this is what I assumed Async.StartChild
is doing. Any thoughts on which solution would be optimal?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
我认为在启动使用
<|>
运算符创建的异步工作流程时会发生堆栈溢出异常。对Async.StartChild
的调用启动第一个工作流程,该工作流程使用<|>
进行组合,因此它再次调用Async.StartChild
> 等。修复此问题的一个简单方法是在计时器的处理程序中安排工作流程(以便它不会添加到当前堆栈中)。类似这样的事情:
修复它的更好方法是创建您自己的
Seq.reduce
- 当前的实现将其逐一折叠,这样您将获得一棵深度为 10000 的树,其中仅包含右侧是单个工作项,左侧是所有其他工作项。如果您创建了工作项的平衡二叉树,那么它不应该堆栈溢出,因为高度只有 15 左右。编辑尝试用以下函数替换
Seq.reduce
:I think that the stack overflow exception happens when starting the asynchronous workflow created using the
<|>
operator. The call toAsync.StartChild
starts the first workflow, which is combined using<|>
and so it makes another call toAsync.StartChild
etc.An easy way to fix it is to schedule the workflow in a handler of a timer (so that the it isn't added to the current stack). Something like:
A better way to fix it would be to create your own
Seq.reduce
- the current implementation folds it one-by-one so you'll get a tree of depth 10000, that contains just a single work item on the right, and all other work items on the left. If you created a ballanced binary tree of work items, then it shouldn't stackoverflow because the height will be only 15 or so.EDIT Try replacing
Seq.reduce
with the following function:我相信托马斯的直觉是正确的,但在花了相当多的时间弄清楚这一点之后,这是我自己的话和更多细节。
问题在于,由于过度同步,上述代码没有实现预期的
mapReduce
算法。特别是,a <|> b<|>在
不会启动a
和b
完成之前,cc
,因此实际上<|>< /code> 对于两次以上计算的并行性是没有用的。
第二个问题是
async.Return x
与Async.FromContinuations(fun (ok, _, _) -> ok x)
同构。该示例实际上是在单线程上按顺序执行的,并且分配的闭包导致堆栈崩溃。对于好奇的读者,下面是我第二次尝试设计这个算法,它似乎表现得更好一些(
n=100000
上大约 1 秒,n=100000
上大约 21 秒code> 具有使用Async.Sleep 1000 扩展的映射和化简函数,我有 Core i3)。
I believe Tomas got the intuition right in the answer, but here it is in my own words and more detail, after spending quite a bit of time to figure this out.
The problem is that the above code does not implement the intended
mapReduce
algorithm due to excessive synchronization. In particular,a <|> b <|> c
does not startc
before botha
andb
have completed, so in fact<|>
is useless for parallelism with more than two computations.The second problem is that
async.Return x
is isomorphic toAsync.FromContinuations(fun (ok, _, _) -> ok x)
. The example then in fact executed sequentially, on the single thread, and the allocated closures blew the stack.For the curious reader, below is my second attempt to design this algorithm, which seems to fare a little better (~1 sec on
n=100000
and ~21 sec onn=100000
with map and reduce functions extended withAsync.Sleep 1000
, I have Core i3).非常有趣的讨论!
我对 Async.Parallel 也有类似的问题,
我非常沮丧......所以我通过创建自己的并行组合器来解决它。
最后受到讨论的启发,我实现了以下mapReduce函数
Very interesting discussion!
I had a similar issue with Async.Parallel
I was very frustrated... so I solved it by creating my own Parallel combinator.
And finally inspired by the discussion, I implemented the following mapReduce function
另一种简单的实现可能类似于:
在这种情况下,映射阶段是并行执行的,然后缩减阶段是顺序执行的,因为它对先前计算的值具有数据依赖性。
Another, simple implementation can be something like:
In this the map phase is executed in Parallel and then reduce phase is sequential as it has data dependency on the previous calculated value.