如何创建一个 Rx observable,当最后一个观察者取消订阅时停止发布事件?

发布于 2024-12-05 14:43:36 字数 121 浏览 0 评论 0原文

我将创建一个可观察量(通过各种方式)并将其返回给感兴趣的各方,但是当他们听完后,我想拆除可观察量,这样它就不会继续消耗资源。另一种方式是将其视为在 pub 子系统中创建主题。当没有人再订阅某个主题时,您不想再保留该主题及其过滤。

I'll create an observable (through a variety of means) and return it to interested parties, but when they're done listening, I'd like to tear down the observable so it doesn't continue consuming resources. Another way to think of it as creating topics in a pub sub system. When no one is subscribed to a topic any more, you don't want to hold the topic and its filtering around anymore.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

请你别敷衍 2024-12-12 14:43:36

Rx 已经有一个运算符可以满足您的需求 - 实际上是两个 - Publish & 参考计数

以下是如何使用它们:

IObservable xs = ...

var rxs = xs.Publish().RefCount();

var sub1 = rxs.Subscribe(x => { });
var sub2 = rxs.Subscribe(x => { });

//later
sub1.Dispose();

//later 
sub2.Dispose();

//The underlying subscription to `xs` is now disposed of.

简单。

Rx already has an operator to suit your needs - well two actually - Publish & RefCount.

Here's how to use them:

IObservable xs = ...

var rxs = xs.Publish().RefCount();

var sub1 = rxs.Subscribe(x => { });
var sub2 = rxs.Subscribe(x => { });

//later
sub1.Dispose();

//later 
sub2.Dispose();

//The underlying subscription to `xs` is now disposed of.

Simple.

不离久伴 2024-12-12 14:43:36

如果我理解你的问题,你想要创建可观察量,以便当所有订阅者都处理了他们的订阅时,即没有更多订阅者,那么你想要执行一个清理函数,这将阻止可观察量产生进一步的值。
如果这是您想要的,那么您可以执行如下操作:

//Wrap a disposable
public class WrapDisposable : IDisposable
    {
        IDisposable disp;
        Action act;
        public WrapDisposable(IDisposable _disp, Action _act)
        {
            disp = _disp;
            act = _act;
        }
        void IDisposable.Dispose()
        {
            act();
            disp.Dispose();
        }
    }

    //Observable that we want to clean up after all subs are done
    public static IObservable<long> GenerateObs(out Action cleanup)
    {
        cleanup = () =>
        {
            Console.WriteLine("All subscribers are done. Do clean up");
        };
        return Observable.Interval(TimeSpan.FromSeconds(1));
    }
    //Wrap the observable
    public static IObservable<T> WrapToClean<T>(IObservable<T> obs, Action onAllDone)
    {
        int count = 0;
        return Observable.CreateWithDisposable<T>(ob =>
        {
            var disp = obs.Subscribe(ob);
            Interlocked.Increment(ref count);
            return new WrapDisposable(disp,() =>
            {
                if (Interlocked.Decrement(ref count) == 0)
                {
                    onAllDone();                                                
                }
            });
        });
    }

//使用示例:

Action cleanup;
var obs = GenerateObs(out cleanup);
var newObs = WrapToClean(obs, cleanup);
newObs.Take(6).Subscribe(Console.WriteLine);
newObs.Take(5).Subscribe(Console.WriteLine);

If I have understood your question you want to create the observable such that when all subscribers have disposed their subscription i.e there is no more subscriber, then you want to execute a clean up function which will stop the observable from production further values.
If this is what you want then you can do something like below:

//Wrap a disposable
public class WrapDisposable : IDisposable
    {
        IDisposable disp;
        Action act;
        public WrapDisposable(IDisposable _disp, Action _act)
        {
            disp = _disp;
            act = _act;
        }
        void IDisposable.Dispose()
        {
            act();
            disp.Dispose();
        }
    }

    //Observable that we want to clean up after all subs are done
    public static IObservable<long> GenerateObs(out Action cleanup)
    {
        cleanup = () =>
        {
            Console.WriteLine("All subscribers are done. Do clean up");
        };
        return Observable.Interval(TimeSpan.FromSeconds(1));
    }
    //Wrap the observable
    public static IObservable<T> WrapToClean<T>(IObservable<T> obs, Action onAllDone)
    {
        int count = 0;
        return Observable.CreateWithDisposable<T>(ob =>
        {
            var disp = obs.Subscribe(ob);
            Interlocked.Increment(ref count);
            return new WrapDisposable(disp,() =>
            {
                if (Interlocked.Decrement(ref count) == 0)
                {
                    onAllDone();                                                
                }
            });
        });
    }

//Usage example:

Action cleanup;
var obs = GenerateObs(out cleanup);
var newObs = WrapToClean(obs, cleanup);
newObs.Take(6).Subscribe(Console.WriteLine);
newObs.Take(5).Subscribe(Console.WriteLine);
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文