如何实现从一个IObserver到另一个IObserver的原子切换?

发布于 2024-11-15 03:18:03 字数 694 浏览 0 评论 0原文

我有一个 IObservable,我使用一些中间步骤将其转换为 IObservable

var observedXDocuments =
    from b in observedBytes
    // Lot of intermediate steps to transform byte arrays into XDocuments
    select xDoc;

在某个时间点,我对观察到 XDocument,因此我订阅了 IObserver。稍后,我想订阅另一个 IObserver并处理掉旧的。

如何在一个原子操作中完成此操作,而不丢失任何观察到的 XDocument?我可以这样做:

oldObserver.Dispose();
observedXDocuments.Subscribe(newObserver);

不过我担心,在这两个调用之间,我可能会丢失一个XDocument。如果我切换这两个调用,可能会两次收到相同的 XDocument

I have an IObservable<byte[]> that I transform into an IObservable<XDocument> using some intermediate steps:

var observedXDocuments =
    from b in observedBytes
    // Lot of intermediate steps to transform byte arrays into XDocuments
    select xDoc;

At some point in time, I'm interested in the observed XDocuments so I subscribe an IObserver<XDocument>. At a later point in time, I would like to subscribe another IObserver<XDocument> and dispose of the old one.

How can I do this in one atomic operation, without loosing any observed XDocument? I could do something like:

oldObserver.Dispose();
observedXDocuments.Subscribe(newObserver);

I'm worried though, that between these two calls, I could loose an XDocument. If I switch the two calls, it could happen that I receive the same XDocument twice.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

指尖微凉心微凉 2024-11-22 03:18:03

我可能会添加一个间接层。编写一个名为 ExchangeableObserver 的类,将其订阅到您的可观察对象,并使其永久订阅。 ExchangeableObserver 的工作是将所有事情委托给给定的子观察者。但程序员可以随时更改委托的子观察者。在我的示例中,我有一个 Exchange() 方法。像这样的东西:

public class ExchangeableObserver<T> : IObserver<T> {
  private IObserver<T> inner;

  public ExchangeableObserver(IObserver<T> inner) {
    this.inner=inner;
  }

  public IObserver<T> Exchange(IObserver<T> newInner) {
    return Interlocked.Exchange(ref inner, newInner);
  }

  public void OnNext(T value) {
    inner.OnNext(value);
  }

  public void OnCompleted() {
    inner.OnCompleted();
  }

  public void OnError(Exception error) {
    inner.OnError(error);
  }
}

I'd probably add a layer of indirection. Write a class called ExchangeableObserver, subscribe it to your observable, and keep it permanently subscribed. The job of ExchangeableObserver is to delegate everything to a given sub-observer. But the programmer is allowed to change the sub-observer being delegated to at any time. In my example I have an Exchange() method. Something like:

public class ExchangeableObserver<T> : IObserver<T> {
  private IObserver<T> inner;

  public ExchangeableObserver(IObserver<T> inner) {
    this.inner=inner;
  }

  public IObserver<T> Exchange(IObserver<T> newInner) {
    return Interlocked.Exchange(ref inner, newInner);
  }

  public void OnNext(T value) {
    inner.OnNext(value);
  }

  public void OnCompleted() {
    inner.OnCompleted();
  }

  public void OnError(Exception error) {
    inner.OnError(error);
  }
}
病毒体 2024-11-22 03:18:03

您可以使用一个信号量,确保在 IObservable 准备 IObservable 时不会发生观察者更改。

伪代码如何做到这一点(不是测试)

  System.Threading.ReaderWriterLockSlim criticalSection 
       = new System.Threading.ReaderWriterLockSlim(...);  


  ... converting from `IObservable<byte[]>` to `IObservable<XDocument>`  
  criticalSection.EnterReadLock();
  Call IObservable<XDocument>
  criticalSection.ExitReadLock();

  .... replacing IObservable<XDocument>
  criticalSection.EnterWriteLock();
  Call change IObservable<XDocument>
  criticalSection.ExitWriteLock();

编辑:使用Call IObservable

  > What exactly do you mean with the line `Call IObservable<XDocument>`?

我解释你的句子

  > I have an `IObservable<byte[]>` that I transform 
  > into an `IObservable<XDocument>` using some intermediate steps...

,你已经为IObservable注册了一个事件处理程序byte[] 创建一个 XDocument 然后调用
触发 IObservable事件的东西。

Call IObservable 表示触发后续事件的代码

you can use a semaphore that makes shure that while IObservable<byte[]> prepares for IObservable<XDocument> no observer-change takes place.

pseudocode how this could be done (not testet)

  System.Threading.ReaderWriterLockSlim criticalSection 
       = new System.Threading.ReaderWriterLockSlim(...);  


  ... converting from `IObservable<byte[]>` to `IObservable<XDocument>`  
  criticalSection.EnterReadLock();
  Call IObservable<XDocument>
  criticalSection.ExitReadLock();

  .... replacing IObservable<XDocument>
  criticalSection.EnterWriteLock();
  Call change IObservable<XDocument>
  criticalSection.ExitWriteLock();

Edit: with Call IObservable<XDocument>

  > What exactly do you mean with the line `Call IObservable<XDocument>`?

I interprete your sentense

  > I have an `IObservable<byte[]>` that I transform 
  > into an `IObservable<XDocument>` using some intermediate steps...

that you have registered an eventhandler for IObservable<byte[]> that creates a XDocument from byte[] and then calls
something that triggers an event for IObservable<XDocument>.

Call IObservable<XDocument> means the code that triggers the followup-event

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文