如何更改 Rx Builder 实现来修复堆栈溢出异常?

发布于 2024-11-10 09:30:05 字数 818 浏览 0 评论 0原文

我正在尝试提出一个 Rx Builder,以在 F# 计算表达式语法中使用反应式扩展。我该如何修复它,以免堆栈崩溃?就像下面的 Seq 例子一样。 是否有计划提供 RxBuilder 的实现作为响应式扩展的一部分或作为 .NET Framework 未来版本的一部分?

open System
open System.Linq
open System.Reactive.Linq

type rxBuilder() =    
    member this.Delay f = Observable.Defer f
    member this.Combine (xs: IObservable<_>, ys : IObservable<_>) = 
        Observable.merge xs ys      
    member this.Yield x = Observable.Return x
    member this.YieldFrom (xs:IObservable<_>) = xs

let rx = rxBuilder()

let rec f x = seq { yield x 
                    yield! f (x + 1) }

let rec g x = rx { yield x 
                    yield! g (x + 1) }


//do f 5 |> Seq.iter (printfn "%A")

do g 5 |> Observable.subscribe (printfn "%A") |> ignore

do System.Console.ReadLine() |> ignore

I'm trying to come up with an Rx Builder to use Reactive Extension within the F# Computation Expression syntax. How do I fix it so that it doesnt blow the stack? Like the Seq example below.
And is there any plans to provide an implementation of the RxBuilder as part of the Reactive Extensions or as part of future versions of the .NET Framework ?

open System
open System.Linq
open System.Reactive.Linq

type rxBuilder() =    
    member this.Delay f = Observable.Defer f
    member this.Combine (xs: IObservable<_>, ys : IObservable<_>) = 
        Observable.merge xs ys      
    member this.Yield x = Observable.Return x
    member this.YieldFrom (xs:IObservable<_>) = xs

let rx = rxBuilder()

let rec f x = seq { yield x 
                    yield! f (x + 1) }

let rec g x = rx { yield x 
                    yield! g (x + 1) }


//do f 5 |> Seq.iter (printfn "%A")

do g 5 |> Observable.subscribe (printfn "%A") |> ignore

do System.Console.ReadLine() |> ignore

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

向日葵 2024-11-17 09:30:06

一个简短的答案是,Rx 框架不支持使用这样的递归模式生成可观察量,因此它不容易完成。用于 F# 序列的Combine 操作需要一些可观察量不提供的特殊处理。 Rx 框架可能期望您使用 Observable.Generate 生成可观察量,然后使用 LINQ 查询/F# 计算构建器来处理它们。

无论如何,这里有一些想法 -

首先,您需要将 Observable.merge 替换为 Observable.Concat。第一个并行运行两个可观察量,而第二个首先从第一个可观察量产生所有值,然后从第二个可观察量产生值。进行此更改后,该代码片段在堆栈溢出之前至少会打印约 800 个数字。

堆栈溢出的原因是 Concat 创建一个调用 Concat 的可观察对象来创建另一个调用 Concat 的可观察对象,等等。解决此问题的一种方法就是添加一些同步。如果您使用的是 Windows 窗体,则可以修改 Delay 以便它在 GUI 线程上安排可观察对象(这会丢弃当前堆栈)。这是一个草图:

type RxBuilder() =   
  member this.Delay f = 
      let sync = System.Threading.SynchronizationContext.Current 
      let res = Observable.Defer f
      { new IObservable<_> with
          member x.Subscribe(a) = 
            sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
            // Note: This is wrong, but we cannot easily get the IDisposable here
            null }
  member this.Combine (xs, ys) = Observable.Concat(xs, ys)
  member this.Yield x = Observable.Return x
  member this.YieldFrom (xs:IObservable<_>) = xs

要正确实现此功能,您必须编写自己的 Concat 方法,这非常复杂。这个想法是这样的:

  • Concat 返回一些特殊类型,例如 IConcatenatedObservable
  • 当递归调用该方法时,您将创建一个相互引用的 IConcatenatedObservable
  • 。 方法将查找该链,当存在例如三个对象时,它将删除中间的一个(以始终保持链的长度最多为 2)。

对于 StackOverflow 的答案来说,这有点太复杂了,但对于 Rx 团队来说,这可能是一个有用的反馈。

A short answer is that Rx Framework doesn't support generating observables using a recursive pattern like this, so it cannot be easily done. The Combine operation that is used for F# sequences needs some special handling that observables do not provide. The Rx Framework probably expects that you'll generate observables using Observable.Generate and then use LINQ queries/F# computation builder to process them.

Anyway, here are some thoughts -

First of all, you need to replace Observable.merge with Observable.Concat. The first one runs both observables in parallel, while the second first yields all values from the first observable and then produces values from the second observable. After this change, the snippet will at least print ~800 numbers before the stack overflow.

The reason for the stack overflow is that Concat creates an observable that calls Concat to create another observable that calls Concat etc. One way to solve this is to add some synchronization. If you're using Windows Forms, then you can modify Delay so that it schedules the observable on the GUI thread (which discards the current stack). Here is a sketch:

type RxBuilder() =   
  member this.Delay f = 
      let sync = System.Threading.SynchronizationContext.Current 
      let res = Observable.Defer f
      { new IObservable<_> with
          member x.Subscribe(a) = 
            sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
            // Note: This is wrong, but we cannot easily get the IDisposable here
            null }
  member this.Combine (xs, ys) = Observable.Concat(xs, ys)
  member this.Yield x = Observable.Return x
  member this.YieldFrom (xs:IObservable<_>) = xs

To implement this properly, you would have to write your own Concat method, which is quite complicated. The idea would be that:

  • Concat returns some special type e.g. IConcatenatedObservable
  • When the method is called recursively you'll create a chain of IConcatenatedObservable that reference each other
  • The Concat method will look for this chain and when there are e.g. three objects, it will drop the middle one (to always keep chain of length at most 2).

That's a bit too complex for a StackOverflow answer, but it may be a useful feedback for the Rx team.

只想待在家 2024-11-17 09:30:06

请注意,这个问题已在 Rx v2.0 中得到修复(如前所述),更普遍地适用于所有排序运算符(Concat、Catch、OnErrorResumeNext)以及命令式运算符(If、While 等)。

基本上,您可以将此类运算符视为订阅终端观察者消息中的另一个序列(例如,Concat 在收到当前序列的 OnCompleted 消息后订阅下一个序列),这就是尾递归类比的用武之地

。 Rx v2.0,所有尾递归订阅都被扁平化为类似队列的数据结构,一次处理一个,与下游观察者对话。这避免了观察者为了连续的序列订阅而相互交谈的无限增长。

Notice this has been fixed in Rx v2.0 (as mentioned here already), more generally for all of the sequencing operators (Concat, Catch, OnErrorResumeNext), as well as the imperative operators (If, While, etc.).

Basically, you can think of this class of operators as doing a subscribe to another sequence in a terminal observer message (e.g. Concat subscribes to the next sequence upon receiving the current one's OnCompleted message), which is where the tail recursion analogy comes in.

In Rx v2.0, all of the tail-recursive subscriptions are flattened into a queue-like data structure for processing one at a time, talking to the downstream observer. This avoids the unbounded growth of observers talking to each other for successive sequence subscriptions.

靑春怀旧 2024-11-17 09:30:06

此问题已在 Rx 2.0 测试版。这是一个测试

This has been fixed in Rx 2.0 Beta. And here's a test.

↙厌世 2024-11-17 09:30:06

像这样的事情怎么办?

type rxBuilder() =    
   member this.Delay (f : unit -> 'a IObservable) = 
               { new IObservable<_> with
                    member this.Subscribe obv = (f()).Subscribe obv }
   member this.Combine (xs:'a IObservable, ys: 'a IObservable) =
               { new IObservable<_> with
                    member this.Subscribe obv = xs.Subscribe obv ; 
                                                ys.Subscribe obv }
   member this.Yield x = Observable.Return x
   member this.YieldFrom xs = xs

let rx = rxBuilder()

let rec f x = rx { yield x 
                   yield! f (x + 1) }

do f 5 |> Observable.subscribe (fun x -> Console.WriteLine x) |> ignore

do System.Console.ReadLine() |> ignore

http://rxbuilder.codeplex.com/(为了试验 RxBuilder 而创建)

xs 一次性是未接线。一旦我尝试连接一次性用品,它就会回到炸毁堆栈的状态。

What about something like this?

type rxBuilder() =    
   member this.Delay (f : unit -> 'a IObservable) = 
               { new IObservable<_> with
                    member this.Subscribe obv = (f()).Subscribe obv }
   member this.Combine (xs:'a IObservable, ys: 'a IObservable) =
               { new IObservable<_> with
                    member this.Subscribe obv = xs.Subscribe obv ; 
                                                ys.Subscribe obv }
   member this.Yield x = Observable.Return x
   member this.YieldFrom xs = xs

let rx = rxBuilder()

let rec f x = rx { yield x 
                   yield! f (x + 1) }

do f 5 |> Observable.subscribe (fun x -> Console.WriteLine x) |> ignore

do System.Console.ReadLine() |> ignore

http://rxbuilder.codeplex.com/ (created for the purpose of experimenting with RxBuilder)

The xs disposable is not wired up. As soon as I try to wire up the disposable it goes back to blowing up the stack.

卸妝后依然美 2024-11-17 09:30:06

如果我们从这个计算表达式(又名 Monad)中删除语法糖,我们将得到:

let rec g x = Observable.Defer (fun () -> Observable.merge(Observable.Return x, g (x + 1) )

Or 在 C# 中:

public static IObservable<int> g(int x)
{
    return Observable.Defer<int>(() =>
    {
      return Observable.Merge(Observable.Return(x), g(x + 1));                    
    });
}

这绝对不是尾递归。我认为如果你可以让它尾递归那么它可能会解决你的问题

If we remove the syntactic sugar from this computation expression (aka Monad) we will have:

let rec g x = Observable.Defer (fun () -> Observable.merge(Observable.Return x, g (x + 1) )

Or in C#:

public static IObservable<int> g(int x)
{
    return Observable.Defer<int>(() =>
    {
      return Observable.Merge(Observable.Return(x), g(x + 1));                    
    });
}

Which is definitely not tail recursive. I think if you can make it tail recursive then it would probably solve your problem

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文