我如何才能实现 IObservable多线程?

发布于 2024-10-14 20:12:19 字数 427 浏览 3 评论 0原文

我根据 [Rx DEVHOL202] 和 http://rxwiki.wikidot.com/101samples#toc48 中的示例编写了一个实现,

这是我的代码。 http://csharp.pastebin.com/pm2NAPx6

  1. 它可以工作,但是对 OnNext 的调用不是非阻塞,这是我想要实现的模拟网络读取并在读取到处理程序时异步地将每个字节块移交给处理程序[此处未完整显示,但可能会缓存结果并进行进一步处理]。

    这样做的好方法是什么?

  2. 一旦抛出异常,所有后续的 OnNext() 都不会被处理! 如果我没有明确退出循环并指示完成。 为什么会这样?

I wrote an implementation based on the examples at [ Rx DEVHOL202] and http : //rxwiki.wikidot.com/101samples#toc48

Here is my code. http://csharp.pastebin.com/pm2NAPx6

  1. It works, but the calls to OnNext are not NonBlocking, which is what i would like to implement to simulate a network read and asynchronously handing off each chunk of bytes as it is read to a handler [ which is not shown here in full but might cache results and do further processing ].

    What is a good way of doing that?

  2. Once the Exception gets thrown, all the subsequent OnNext()s are not processed!!
    If I dont explicitly exit the loop and indicate completion.
    Why is this so?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

与风相奔跑 2024-10-21 20:12:19

我强烈建议反对尝试实现您自己的IObservable。隐式规则超越了线程安全并涉及方法调用顺序。

通常您会从方法返回 IObservable 实例,但如果您需要一个直接实现它的类,则应该包装一个 Subject

public class SomeObservable<T> : IObservable<T>
{
        private Subject<T> subject = new Subject<T>();

        public IDisposable Subscribe(IObserver<T> observer)
        {
            return subject.Subscribe(observer);
        }
}

1. 您需要注意如何您的观察者支持这一点(因为您可能具有共享数据),但您可以通过以下两种方式之一使调用处理异步:

  • 调用 ObserveOn(Scheduler.TaskPool)(或 ThreadPool) (如果您是 4.0 之前的版本),然后再调用 Subscribe。这会导致消息通过调度程序(在本例中为任务)进行路由。
  • IScheduler 传递给 Observer 构造函数
  • 启动异步任务/线程。

从订阅者2 是预期的功能。 IObservableIObserver 之间的约定是 (OnNext)* (OnCompleted | OnError)?,也就是说“零次或多次调用 OnNext ,可选地后跟 OnCompleted 或 OnError”。 OnCompleted|OnError之后,调用OnNext无效。

Rx 中的所有运算符(WhereSelect 等)都强制执行此规则,即使源代码没有这样做。

I would strongly recommend against trying to implement your own IObservable. The implicit rules go beyond thread safety and into method call ordering.

Usually you would return IObservable instances from methods, but if you need a class that implements it directly, you should wrap a Subject:

public class SomeObservable<T> : IObservable<T>
{
        private Subject<T> subject = new Subject<T>();

        public IDisposable Subscribe(IObserver<T> observer)
        {
            return subject.Subscribe(observer);
        }
}

1. You need to be careful about how you support this from your observer (as you may have shared data), but you can make your handling of the calls asynchronous in one of two ways:

  • Call ObserveOn(Scheduler.TaskPool) (or ThreadPool if you are pre-4.0) before you call Subscribe. This causes messages to be routed through a scheduler (in this case, a task)
  • Pass the IScheduler to the Observer constructor
  • Start an asynchronous task/thread from your subscriber

2. This is the expected functionality. The contract between IObservable and IObserver is (OnNext)* (OnCompleted | OnError)?, which is to say "zero or more calls to OnNext, optionally followed by EITHER OnCompleted or OnError". After OnCompleted|OnError, it is invalid to call OnNext.

All the operators in Rx (Where, Select, etc) enforce this rule, even if the source doesn't.

紫竹語嫣☆ 2024-10-21 20:12:19

我不确定我是否正确理解你的问题,但是为什么你不能在不同的线程上执行任何逻辑,或者如果它足够小,则将其推到线程池上/代码>?

下面是一个示例:

ThreadPool.QueueUserWorkItem(o=>
{
    _paidSubj.OnNext(this); // Raise PAID event 
});

我对 Subject 上的数据类型感到困惑,我从未在 C# 中见过该类...它是您创建的吗? OnNext 是引发的事件还是只是一个方法?如果 OnNext 是一个事件,那么您可以使用 BeginInvoke 异步调用它:

_paidSubj.OnNext.BeginInvoke(this, null, null);

更新:

如果您实现这种类型,将会发生一个重要的事情异步行为的影响:如果您通过传递Order来通知IObserver,那么当您尝试读取观察者中的数据(即订单缓冲区)时,实际上可能会出现一些不一致的情况而 Order 则继续在其 Read 线程中修改缓冲区。因此,至少有两种方法可以解决这个问题:

  1. 使用锁限制对将要修改的内存的访问。
  2. 仅通知观察者您希望其看到的相关信息:
    一个。通过将信息作为值(而不是引用)传递。
    b.通过创建传输信息的不可变结构。

聚苯乙烯
您从哪里获得 SubjectSubject 应该是 OrderObserver 吗?

I'm not sure if I understand your question correctly, but why can't you just execute whatever logic you have on a different Thread, or if it's small enough push it on a ThreadPool?

Here is an example:

ThreadPool.QueueUserWorkItem(o=>
{
    _paidSubj.OnNext(this); // Raise PAID event 
});

I'm confused about the data type on Subject, I have never seen that class in C#... is it something that you created? Is OnNext an event that gets raised or is it just a method? If OnNext is an event, then you can use BeginInvoke to invoke it asynchronously:

_paidSubj.OnNext.BeginInvoke(this, null, null);

Update:

An important thing that will happen if you implement this kind of asynchronous behavior: if you notify an IObserver by passing the Order, you might actually have some inconsistencies when you try to read the data in the observer (namely the order buffer) while the Order continues to modify the buffer in its Read thread. So there are at least two ways to solve this:

  1. Restrict access to the memory which will get modified by using locks.
  2. Only notify the observer with the relevant information that you want it to see:
    a. By passing the information as a value (not as a reference).
    b. By creating an immutable structure that transmits the information.

P.S.
Where did you get Subject from? Is Subject supposed to be an OrderObserver?

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文