在 RX 中同步多个订阅

发布于 2024-12-23 13:31:25 字数 114 浏览 1 评论 0原文

是否可以强制对不同的可观察量进行多个 RX 订阅串行运行(而不是同时运行)?

我知道我可以使用 EventLoopScheduler 来实现这一点,但这会降低性能,因为所有处理都将在单个线程上完成。

Is it possible to force multiple RX subscriptions to different observables run serially (not simultaneously)?

I am aware that I can use EventLoopScheduler for that, but that will degrade performance because all handling will be done on a single thread.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

行至春深 2024-12-30 13:31:25

如果您打算运行一个可观察对象直到OnCompleted,然后开始下一个,您应该查看Concat。如果您打算同时订阅多个不同的可观察量,则可以使用合并(如果语义对您的场景有意义)。如果 Merge 不合适,我建议在观察者方法或您已经了解的 EventLoopScheduler 中使用标准线程同步方法之一(锁定、监视器等)。

编辑保留在下面的原始答案

是的,可以强制串行观察者执行。然而,是否需要取决于可观察到的情况。一般来说,热可观察量已经串行运行,而冷可观察量则不会。这是热可观测值和冷可观测值工作方式差异的副作用。要将冷的可观察对象变为热对象,从而使观察者串行运行,请使用Publish。这是演示各种行为的示例。

Sub Main()
    'hot observable, runs serially
    Dim trigger As New ObsEvent
    Dim eobs = Observable.FromEventPattern(Of ItemEventArgs(Of String))(
                    Sub(h) AddHandler trigger.Triggered, h,
                    Sub(h) RemoveHandler trigger.Triggered, h)
    Dim sub1 = eobs.Subscribe(Sub(v)
                                  Console.WriteLine("Starting event observer 1: {0}", v.EventArgs.Item)
                                  Thread.Sleep(2000)
                                  Console.WriteLine("Ending event observer 1")
                              End Sub)
    trigger.Trigger("event trigger 1")
    Dim sub2 = eobs.Subscribe(Sub(v)
                                  Console.WriteLine("Starting event observer 2: {0}", v.EventArgs.Item)
                                  Thread.Sleep(2000)
                                  Console.WriteLine("Ending event observer 2")
                              End Sub)
    trigger.Trigger("event trigger 2")

    Console.WriteLine()
    Console.WriteLine()

    'cold observable, runs "simultaneously"
    Dim tobs = Observable.Timer(TimeSpan.FromSeconds(5))
    sub1 = tobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting timer observer 1")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending timer observer 1")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer 1 completed"))
    Thread.Sleep(500)
    sub2 = tobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting timer observer 2")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending timer observer 2")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer 2 completed"))

    'cold observable turned hot, runs serially
    Dim pobs = tobs.Publish()
    sub1 = pobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting publish observer 1")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending publish observer 1")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer P1 completed"))
    Thread.Sleep(500)
    sub2 = pobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting publish observer 2")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending publish observer 2")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer P2 completed"))
    pobs.Connect()

    Console.ReadKey()
End Sub

If you mean to run one observable until OnCompleted then start the next, you should look into Concat. If you mean to have multiple different observables that are subscribed to at the same time, you could use Merge (if the semantics make sense for your scenario). If Merge is not appropriate, I would recommend using one of the standard thread synchronization methods (lock, Monitor, etc) in the observer methods or the EventLoopScheduler you already know about.

EDIT Original answer preserved below

Yes, it is possible to force serial observer execution. However, whether you need to or not depends on the observable. In general, hot observables will already run serially, whereas cold observables will not. This is a side-effect of the difference in the way hot and cold observables work. To make a cold observable hot, and thus make observers run serially, use Publish. Here's an example demonstrating the various behaviors.

Sub Main()
    'hot observable, runs serially
    Dim trigger As New ObsEvent
    Dim eobs = Observable.FromEventPattern(Of ItemEventArgs(Of String))(
                    Sub(h) AddHandler trigger.Triggered, h,
                    Sub(h) RemoveHandler trigger.Triggered, h)
    Dim sub1 = eobs.Subscribe(Sub(v)
                                  Console.WriteLine("Starting event observer 1: {0}", v.EventArgs.Item)
                                  Thread.Sleep(2000)
                                  Console.WriteLine("Ending event observer 1")
                              End Sub)
    trigger.Trigger("event trigger 1")
    Dim sub2 = eobs.Subscribe(Sub(v)
                                  Console.WriteLine("Starting event observer 2: {0}", v.EventArgs.Item)
                                  Thread.Sleep(2000)
                                  Console.WriteLine("Ending event observer 2")
                              End Sub)
    trigger.Trigger("event trigger 2")

    Console.WriteLine()
    Console.WriteLine()

    'cold observable, runs "simultaneously"
    Dim tobs = Observable.Timer(TimeSpan.FromSeconds(5))
    sub1 = tobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting timer observer 1")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending timer observer 1")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer 1 completed"))
    Thread.Sleep(500)
    sub2 = tobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting timer observer 2")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending timer observer 2")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer 2 completed"))

    'cold observable turned hot, runs serially
    Dim pobs = tobs.Publish()
    sub1 = pobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting publish observer 1")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending publish observer 1")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer P1 completed"))
    Thread.Sleep(500)
    sub2 = pobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting publish observer 2")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending publish observer 2")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer P2 completed"))
    pobs.Connect()

    Console.ReadKey()
End Sub
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文