创建对 IObservable 的弱订阅

发布于 2024-12-03 08:17:22 字数 905 浏览 1 评论 0原文

我想要做的是确保如果对我的观察者的唯一引用是可观察的,它会被垃圾收集并停止接收消息。

假设我有一个带有列表框的控件,名为“消息”,后面的代码是:

//Short lived display of messages (only while the user's viewing incoming messages)
public partial class MessageDisplay : UserControl
{
    public MessageDisplay()
    {
        InitializeComponent();
        MySource.IncomingMessages.Subscribe(m => Messages.Items.Add(m));
    }
}

正在连接到此源:

//Long lived location for message store
static class MySource
{
    public readonly static IObservable<string> IncomingMessages = new ReplaySubject<string>;
}

我不希望消息显示在不再可见后很长时间内保留在内存中。理想情况下,我想要一个小扩展,这样我就可以写:

MySource.IncomingMessages.ToWeakObservable().Subscribe(m => Messages.Items.Add(m));

我也不想依赖 MessageDisplay 是一个用户控件这一事实,因为我稍后想要使用 MessageDisplayViewModel 进行 MVVM 设置,而 MessageDisplayViewModel 不会是用户控制。

What I want to do is ensure that if the only reference to my observer is the observable, it get's garbage collected and stops receiving messages.

Say I have a control with a list box on it called Messages and this code behind:

//Short lived display of messages (only while the user's viewing incoming messages)
public partial class MessageDisplay : UserControl
{
    public MessageDisplay()
    {
        InitializeComponent();
        MySource.IncomingMessages.Subscribe(m => Messages.Items.Add(m));
    }
}

Which is connecting to this source:

//Long lived location for message store
static class MySource
{
    public readonly static IObservable<string> IncomingMessages = new ReplaySubject<string>;
}

What I don't want is to have the Message Display being kept in memory long after it's no longer visible. Ideally I'd like a little extension so I can write:

MySource.IncomingMessages.ToWeakObservable().Subscribe(m => Messages.Items.Add(m));

I also don't want to rely on the fact that MessageDisplay is a user control as I will later want to go for an MVVM setup with MessageDisplayViewModel which won't be a user control.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

别低头,皇冠会掉 2024-12-10 08:17:22

您可以将代理观察者订阅到可观察对象,该可观察对象持有对实际观察者的弱引用,并在实际观察者不再活动时处置订阅:

static IDisposable WeakSubscribe<T>(
    this IObservable<T> observable, IObserver<T> observer)
{
    return new WeakSubscription<T>(observable, observer);
}

class WeakSubscription<T> : IDisposable, IObserver<T>
{
    private readonly WeakReference reference;
    private readonly IDisposable subscription;
    private bool disposed;

    public WeakSubscription(IObservable<T> observable, IObserver<T> observer)
    {
        this.reference = new WeakReference(observer);
        this.subscription = observable.Subscribe(this);
    }

    void IObserver<T>.OnCompleted()
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnCompleted();
        else this.Dispose();
    }

    void IObserver<T>.OnError(Exception error)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnError(error);
        else this.Dispose();
    }

    void IObserver<T>.OnNext(T value)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnNext(value);
        else this.Dispose();
    }

    public void Dispose()
    {
        if (!this.disposed)
        {
            this.disposed = true;
            this.subscription.Dispose();
        }
    }
}

You can subscribe a proxy observer to the observable that holds a weak reference to the actual observer and disposes the subscription when the actual observer is no longer alive:

static IDisposable WeakSubscribe<T>(
    this IObservable<T> observable, IObserver<T> observer)
{
    return new WeakSubscription<T>(observable, observer);
}

class WeakSubscription<T> : IDisposable, IObserver<T>
{
    private readonly WeakReference reference;
    private readonly IDisposable subscription;
    private bool disposed;

    public WeakSubscription(IObservable<T> observable, IObserver<T> observer)
    {
        this.reference = new WeakReference(observer);
        this.subscription = observable.Subscribe(this);
    }

    void IObserver<T>.OnCompleted()
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnCompleted();
        else this.Dispose();
    }

    void IObserver<T>.OnError(Exception error)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnError(error);
        else this.Dispose();
    }

    void IObserver<T>.OnNext(T value)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnNext(value);
        else this.Dispose();
    }

    public void Dispose()
    {
        if (!this.disposed)
        {
            this.disposed = true;
            this.subscription.Dispose();
        }
    }
}
邮友 2024-12-10 08:17:22

几年后遇到了这个线程...只是想指出 Samuel Jack 的博客,它向 IObservable 添加了一个名为 WeaklySubscribe 的扩展方法。它使用一种在主体和观察者之间添加垫片的方法,通过 WeakReference 跟踪目标。这类似于其他人针对事件订阅中的强引用问题提供的解决方案,例如 本文Paul Stovell 的解决方案。使用基于 Paul 的方法一段时间后,我喜欢 Samuel 对弱 IObservable 订阅的解决方案。

Ran across this thread a couple years later...just wanted to point forward to the solution identified on Samuel Jack's blog which adds an extension method to IObservable called WeaklySubscribe. It uses an approach of adding a shim between the subject and observer that tracks the target with a WeakReference. That is similar to solutions offered by others for the problem of strong references in event subscriptions, such as in this article or this solution by Paul Stovell. Having for awhile used something based on Paul's approach I like Samuel's solution to weak IObservable Subscribes.

溺深海 2024-12-10 08:17:22

还有另一个选项使用 weak-event- 基本上,System.Windows.WeakEventManager 已经满足了您的需求

当您的 ViewModel 依赖于具有事件的服务时,使用 MVVM,您可以弱订阅这些服务,从而允许您的 ViewModel 与视图一起收集,而无需事件订阅使其保持活动状态。

using System;
using System.Windows;

class LongLivingSubject
{ 
    public event EventHandler<EventArgs> Notifications = delegate { }; 
}

class ShortLivingObserver
{
    public ShortLivingObserver(LongLivingSubject subject)
    { 
        WeakEventManager<LongLivingSubject, EventArgs>
            .AddHandler(subject, nameof(subject.Notifications), Subject_Notifications); 
    }

    private void Subject_Notifications(object sender, EventArgs e) 
    { 
    }
}

There is another option using the weak-event-patterns

Essentially System.Windows.WeakEventManager has you covered.

Using MVVM when your ViewModel relies on services with events you can weakly subscribe to those services allowing your ViewModel to be collected with the view without the event subscription keeping it alive.

using System;
using System.Windows;

class LongLivingSubject
{ 
    public event EventHandler<EventArgs> Notifications = delegate { }; 
}

class ShortLivingObserver
{
    public ShortLivingObserver(LongLivingSubject subject)
    { 
        WeakEventManager<LongLivingSubject, EventArgs>
            .AddHandler(subject, nameof(subject.Notifications), Subject_Notifications); 
    }

    private void Subject_Notifications(object sender, EventArgs e) 
    { 
    }
}
撧情箌佬 2024-12-10 08:17:22

这是我的实现(不再简单)

public class WeakObservable<T>: IObservable<T>
{
    private IObservable<T> _source;

    public WeakObservable(IObservable<T> source)
    {
        #region Validation

        if (source == null)
            throw new ArgumentNullException("source");

        #endregion Validation

        _source = source;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        IObservable<T> source = _source;
        if(source == null)
            return Disposable.Empty;
        var weakObserver = new WaekObserver<T>(observer);
        IDisposable disp = source.Subscribe(weakObserver);
        return disp;
    }
}
    public class WaekObserver<T>: IObserver<T>
{
    private WeakReference<IObserver<T>> _target;

    public WaekObserver(IObserver<T> target)
    {
        #region Validation

        if (target == null)
            throw new ArgumentNullException("target");

        #endregion Validation

        _target = new WeakReference<IObserver<T>>(target);
    }

    private IObserver<T> Target
    {
        get
        {
            IObserver<T> target;
            if(_target.TryGetTarget(out target))
                return target;
            return null;
        }
    }

    #region IObserver<T> Members

    /// <summary>
    /// Notifies the observer that the provider has finished sending push-based notifications.
    /// </summary>
    public void OnCompleted()
    {
        IObserver<T> target = Target;
        if (target == null)
            return;

        target.OnCompleted();
    }

    /// <summary>
    /// Notifies the observer that the provider has experienced an error condition.
    /// </summary>
    /// <param name="error">An object that provides additional information about the error.</param>
    public void OnError(Exception error)
    {
        IObserver<T> target = Target;
        if (target == null)
            return;

        target.OnError(error);
    }

    /// <summary>
    /// Provides the observer with new data.
    /// </summary>
    /// <param name="value">The current notification information.</param>
    public void OnNext(T value)
    {
        IObserver<T> target = Target;
        if (target == null)
            return;

        target.OnNext(value);
    }

    #endregion IObserver<T> Members
}
    public static class RxExtensions
{
    public static IObservable<T> ToWeakObservable<T>(this IObservable<T> source)
    {
        return new WeakObservable<T>(source);
    }
}
        static void Main(string[] args)
    {
        Console.WriteLine("Start");
        var xs = Observable.Interval(TimeSpan.FromSeconds(1));
        Sbscribe(xs);

        Thread.Sleep(2020);
        Console.WriteLine("Collect");
        GC.Collect();
        GC.WaitForPendingFinalizers();
        GC.Collect();
        Console.WriteLine("Done");
        Console.ReadKey();
    }

    private static void Sbscribe<T>(IObservable<T> source)
    {
        source.ToWeakObservable().Subscribe(v => Console.WriteLine(v));
    }

this is my implementation (quit simple one)

public class WeakObservable<T>: IObservable<T>
{
    private IObservable<T> _source;

    public WeakObservable(IObservable<T> source)
    {
        #region Validation

        if (source == null)
            throw new ArgumentNullException("source");

        #endregion Validation

        _source = source;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        IObservable<T> source = _source;
        if(source == null)
            return Disposable.Empty;
        var weakObserver = new WaekObserver<T>(observer);
        IDisposable disp = source.Subscribe(weakObserver);
        return disp;
    }
}
    public class WaekObserver<T>: IObserver<T>
{
    private WeakReference<IObserver<T>> _target;

    public WaekObserver(IObserver<T> target)
    {
        #region Validation

        if (target == null)
            throw new ArgumentNullException("target");

        #endregion Validation

        _target = new WeakReference<IObserver<T>>(target);
    }

    private IObserver<T> Target
    {
        get
        {
            IObserver<T> target;
            if(_target.TryGetTarget(out target))
                return target;
            return null;
        }
    }

    #region IObserver<T> Members

    /// <summary>
    /// Notifies the observer that the provider has finished sending push-based notifications.
    /// </summary>
    public void OnCompleted()
    {
        IObserver<T> target = Target;
        if (target == null)
            return;

        target.OnCompleted();
    }

    /// <summary>
    /// Notifies the observer that the provider has experienced an error condition.
    /// </summary>
    /// <param name="error">An object that provides additional information about the error.</param>
    public void OnError(Exception error)
    {
        IObserver<T> target = Target;
        if (target == null)
            return;

        target.OnError(error);
    }

    /// <summary>
    /// Provides the observer with new data.
    /// </summary>
    /// <param name="value">The current notification information.</param>
    public void OnNext(T value)
    {
        IObserver<T> target = Target;
        if (target == null)
            return;

        target.OnNext(value);
    }

    #endregion IObserver<T> Members
}
    public static class RxExtensions
{
    public static IObservable<T> ToWeakObservable<T>(this IObservable<T> source)
    {
        return new WeakObservable<T>(source);
    }
}
        static void Main(string[] args)
    {
        Console.WriteLine("Start");
        var xs = Observable.Interval(TimeSpan.FromSeconds(1));
        Sbscribe(xs);

        Thread.Sleep(2020);
        Console.WriteLine("Collect");
        GC.Collect();
        GC.WaitForPendingFinalizers();
        GC.Collect();
        Console.WriteLine("Done");
        Console.ReadKey();
    }

    private static void Sbscribe<T>(IObservable<T> source)
    {
        source.ToWeakObservable().Subscribe(v => Console.WriteLine(v));
    }
别念他 2024-12-10 08:17:22

关键是要认识到您必须传递目标和双参数操作。单参数操作永远不会做到这一点,因为要么您对操作使用弱引用(操作会被GC),要么对操作使用强引用,而操作又对目标有强引用,因此目标无法被 GC 处理。记住这一点,以下工作有效:

using System;

namespace Closures {
  public static class WeakReferenceExtensions {
    /// <summary> returns null if target is not available. Safe to call, even if the reference is null. </summary>
    public static TTarget TryGetTarget<TTarget>(this WeakReference<TTarget> reference) where TTarget : class {
      TTarget r = null;
      if (reference != null) {
        reference.TryGetTarget(out r);
      }
      return r;
    }
  }
  public static class ObservableExtensions {

    public static IDisposable WeakSubscribe<T, U>(this IObservable<U> source, T target, Action<T, U> action)
      where T : class {
      var weakRef = new WeakReference<T>(target);
      var r = source.Subscribe(u => {
        var t = weakRef.TryGetTarget();
        if (t != null) {
          action(t, u);
        }
      });
      return r;
    }
  }
}

示例可观察:

using System;
using System.Reactive.Subjects;

namespace Closures {
  public class Observable {
    public IObservable<int> ObservableProperty => _subject;
    private Subject<int> _subject = new Subject<int>();
    private int n;
    public void Fire() {
      _subject.OnNext(n++);
    }
  }
}

用法:

Class SomeClass {

 IDisposable disposable;

 public void SomeMethod(Observable observeMe) {
   disposable = observeMe.ObservableProperty.WeakSubscribe(this, (wo, n) => wo.Log(n));
 }

  public void Log(int n) {
    System.Diagnostics.Debug.WriteLine("log "+n);
  }
}

The key is to recognize that you are going to have to pass in both the target and a two-parameter action. A one-parameter action will never do it, because either you use a weak reference to your action (and the action gets GC'd), or you use a strong reference to your action, which in turn has a strong reference to the target, so the target can't get GC'd. Keeping that in mind, the following works:

using System;

namespace Closures {
  public static class WeakReferenceExtensions {
    /// <summary> returns null if target is not available. Safe to call, even if the reference is null. </summary>
    public static TTarget TryGetTarget<TTarget>(this WeakReference<TTarget> reference) where TTarget : class {
      TTarget r = null;
      if (reference != null) {
        reference.TryGetTarget(out r);
      }
      return r;
    }
  }
  public static class ObservableExtensions {

    public static IDisposable WeakSubscribe<T, U>(this IObservable<U> source, T target, Action<T, U> action)
      where T : class {
      var weakRef = new WeakReference<T>(target);
      var r = source.Subscribe(u => {
        var t = weakRef.TryGetTarget();
        if (t != null) {
          action(t, u);
        }
      });
      return r;
    }
  }
}

Sample Observable:

using System;
using System.Reactive.Subjects;

namespace Closures {
  public class Observable {
    public IObservable<int> ObservableProperty => _subject;
    private Subject<int> _subject = new Subject<int>();
    private int n;
    public void Fire() {
      _subject.OnNext(n++);
    }
  }
}

Usage:

Class SomeClass {

 IDisposable disposable;

 public void SomeMethod(Observable observeMe) {
   disposable = observeMe.ObservableProperty.WeakSubscribe(this, (wo, n) => wo.Log(n));
 }

  public void Log(int n) {
    System.Diagnostics.Debug.WriteLine("log "+n);
  }
}
孤者何惧 2024-12-10 08:17:22

下面的代码的灵感来自 dtb 的原始帖子。唯一的变化是它返回对观察者的引用作为 IDisposable 的一部分。这意味着,只要您保留对在链末尾获得的 IDisposable 的引用,对 IObserver 的引用就会保持活动状态(假设所有一次性对象都保留对它们之前的一次性对象的引用)。这允许使用诸如 Subscribe(M=>DoSomethingWithM(M)) 之类的扩展方法,因为我们保留对隐式构造的 IObserver 的引用,但我们不保留来自源的强引用到 IObserver(这会产生内存泄漏)。

using System.Reactive.Linq;

static class WeakObservation
{
    public static IObservable<T> ToWeakObservable<T>(this IObservable<T> observable)
    {
        return Observable.Create<T>(observer =>
            (IDisposable)new DisposableReference(new WeakObserver<T>(observable, observer), observer)
            );
    }
}

class DisposableReference : IDisposable
{
    public DisposableReference(IDisposable InnerDisposable, object Reference)
    {
        this.InnerDisposable = InnerDisposable;
        this.Reference = Reference;
    }

    private IDisposable InnerDisposable;
    private object Reference;

    public void Dispose()
    {
        InnerDisposable.Dispose();
        Reference = null;
    }
}

class WeakObserver<T> : IObserver<T>, IDisposable
{
    private readonly WeakReference reference;
    private readonly IDisposable subscription;
    private bool disposed;

    public WeakObserver(IObservable<T> observable, IObserver<T> observer)
    {
        this.reference = new WeakReference(observer);
        this.subscription = observable.Subscribe(this);
    }

    public void OnCompleted()
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnCompleted();
        else this.Dispose();
    }

    public void OnError(Exception error)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnError(error);
        else this.Dispose();
    }

    public void OnNext(T value)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnNext(value);
        else this.Dispose();
    }

    public void Dispose()
    {
        if (!this.disposed)
        {
            this.disposed = true;
            this.subscription.Dispose();
        }
    }
}

The code below is inspired by dtb's original post. The only change is that it returns a reference to the observer as part of the IDisposable. This means that the reference to the IObserver will be kept alive as long as you keep a reference to the IDisposable that you get out at the end of the chain (assuming all disposables keep a reference to the disposable before them). This allows the usage of the extension methods such as Subscribe(M=>DoSomethingWithM(M)) because we keep a reference to the implicitly constructed IObserver but we don't keep a strong reference from the source to the IObserver (which would produce a memory leek).

using System.Reactive.Linq;

static class WeakObservation
{
    public static IObservable<T> ToWeakObservable<T>(this IObservable<T> observable)
    {
        return Observable.Create<T>(observer =>
            (IDisposable)new DisposableReference(new WeakObserver<T>(observable, observer), observer)
            );
    }
}

class DisposableReference : IDisposable
{
    public DisposableReference(IDisposable InnerDisposable, object Reference)
    {
        this.InnerDisposable = InnerDisposable;
        this.Reference = Reference;
    }

    private IDisposable InnerDisposable;
    private object Reference;

    public void Dispose()
    {
        InnerDisposable.Dispose();
        Reference = null;
    }
}

class WeakObserver<T> : IObserver<T>, IDisposable
{
    private readonly WeakReference reference;
    private readonly IDisposable subscription;
    private bool disposed;

    public WeakObserver(IObservable<T> observable, IObserver<T> observer)
    {
        this.reference = new WeakReference(observer);
        this.subscription = observable.Subscribe(this);
    }

    public void OnCompleted()
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnCompleted();
        else this.Dispose();
    }

    public void OnError(Exception error)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnError(error);
        else this.Dispose();
    }

    public void OnNext(T value)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnNext(value);
        else this.Dispose();
    }

    public void Dispose()
    {
        if (!this.disposed)
        {
            this.disposed = true;
            this.subscription.Dispose();
        }
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文