如何重新订阅特定点的序列?

发布于 2024-12-08 22:52:09 字数 181 浏览 0 评论 0原文

我正在尝试解决以下问题: a) 订阅者在一段时间内接收来自 IObservable 的事件。然后它取消订阅,做一些事情然后再次订阅。在这里,它应该从执行取消订阅的同一点开始接收事件。 b) 这种行为对于多订阅者模型来说是理想的。例如,当一个人取消订阅时,其他人应该继续接收事件。

RX方面有什么建议吗?

提前致谢!

I'm trying to solve the following:
a) subscriber receives events from IObservable for some time. Then it unsubscribes, do some stuff and then subscribe again. Here it should start receiving events from exactly the same point where unsubscription was performed.
b) Such behavior is desirable for multiple subscribers model. E.g. when one has unsubscribed, others should continue receiving events.

Are there any suggestions from the RX side?

Thanks in advance!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

豆芽 2024-12-15 22:52:11

听起来您需要一个“可暂停”流。假设一次只有 1 个订阅者会处理这些值(而其他订阅者只是等待),这个 解决方案可能就是您所需要的。

It sounds like you need a "pausable" stream. Assuming that only 1 subscriber will handle the values at a time (while the other subscribers just wait), this solution is probably what you need.

拒绝两难 2024-12-15 22:52:10

这是一种相当简单的 Rx 方法,可以执行您想要从 我的复制的操作回答 href="https://stackoverflow.com/questions/7620182/pause-and-resume-subscription-on-cold-iobservable">另一个问题。我创建了一个名为 Pausable 的扩展方法,它采用一个源 observable 和第二个 boolean 值的 observable,用于暂停或恢复该 observable。

public static IObservable<T> Pausable<T>(
    this IObservable<T> source,
    IObservable<bool> pauser)
{
    return Observable.Create<T>(o =>
    {
        var paused = new SerialDisposable();
        var subscription = Observable.Publish(source, ps =>
        {
            var values = new ReplaySubject<T>();
            Func<bool, IObservable<T>> switcher = b =>
            {
                if (b)
                {
                    values.Dispose();
                    values = new ReplaySubject<T>();
                    paused.Disposable = ps.Subscribe(values);
                    return Observable.Empty<T>();
                }
                else
                {
                    return values.Concat(ps);
                }
            };

            return pauser.StartWith(false).DistinctUntilChanged()
                .Select(p => switcher(p))
                .Switch();
        }).Subscribe(o);
        return new CompositeDisposable(subscription, paused);
    });
}

它可以这样使用:

var xs = Observable.Generate(
    0,
    x => x < 100,
    x => x + 1,
    x => x,
    x => TimeSpan.FromSeconds(0.1));

var bs = new Subject<bool>();

var pxs = xs.Pausable(bs);

pxs.Subscribe(x => { /* Do stuff */ });

Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);

Here's a reasonably simple Rx way to do what you want copied from my answer to this other question. I've created an extension method called Pausable that takes a source observable and a second observable of boolean that pauses or resumes the observable.

public static IObservable<T> Pausable<T>(
    this IObservable<T> source,
    IObservable<bool> pauser)
{
    return Observable.Create<T>(o =>
    {
        var paused = new SerialDisposable();
        var subscription = Observable.Publish(source, ps =>
        {
            var values = new ReplaySubject<T>();
            Func<bool, IObservable<T>> switcher = b =>
            {
                if (b)
                {
                    values.Dispose();
                    values = new ReplaySubject<T>();
                    paused.Disposable = ps.Subscribe(values);
                    return Observable.Empty<T>();
                }
                else
                {
                    return values.Concat(ps);
                }
            };

            return pauser.StartWith(false).DistinctUntilChanged()
                .Select(p => switcher(p))
                .Switch();
        }).Subscribe(o);
        return new CompositeDisposable(subscription, paused);
    });
}

It can be used like this:

var xs = Observable.Generate(
    0,
    x => x < 100,
    x => x + 1,
    x => x,
    x => TimeSpan.FromSeconds(0.1));

var bs = new Subject<bool>();

var pxs = xs.Pausable(bs);

pxs.Subscribe(x => { /* Do stuff */ });

Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文