我有一个 IObservable 和几个根据某些条件处理字符串的观察者:
observable.Subscribe(s => { if (s.StartsWith("a")) {...} });
observable.Subscribe(s => { if (s.StartsWith("b")) {...} });
observable.Subscribe(s => { if (s.StartsWith("c")) {...} });
observable.Subscribe(s => { if (s.StartsWith("d")) {...} });
....
这是一个简化的示例(条件更复杂并且观察到的事件不是字符串),但您明白了。
我想要一个 IObserver 来捕获任何其他观察者未处理的所有字符串。可以随时添加具有不同条件的观察者(即:StartsWith("e")
),并且条件集不会重叠。
这种情况是否得到某种支持?或者,一旦所有其他观察者都尝试过,我是否必须将观察到的字符串标记为已处理并订阅未处理的字符串(以及如何实现)?
I have an IObservable<string>
and several observers that handle strings based on some condition:
observable.Subscribe(s => { if (s.StartsWith("a")) {...} });
observable.Subscribe(s => { if (s.StartsWith("b")) {...} });
observable.Subscribe(s => { if (s.StartsWith("c")) {...} });
observable.Subscribe(s => { if (s.StartsWith("d")) {...} });
....
This is a simplified example (the condition is more complex and the observed events aren't strings) but you get the idea.
I'd like to have an IObserver<string>
that catches all strings that are not handled by any other observer. Observers with different conditions (i.e.: StartsWith("e")
) can be added at any time and the set of conditions does not overlap.
Is this scenario somehow supported? Or do I have to mark observed strings as handled and subscribe to unhandled strings once all other observers have tried (and how do I implement that)?
发布评论
评论(3)
我有两种方法。
第一个提供了一种将谓词/动作对链接在一起以“虹吸”匹配的值的方法。它遵循 Rx 操作员风格。
我可以这样写:
如果我有这个扩展方法:
它不允许您动态添加和删除谓词/操作对,但它是一个相当简单的运算符,可能很有用。
为了拥有完整的添加/删除功能,我想出了这种方法:
.Syphon(out add)
扩展方法重载允许该方法有效地返回两个结果 - 正常返回值为IObservable
,第二个结果为Func, Action, IDisposable>
。第二个返回值允许将新的谓词/操作对添加到虹吸运算符,然后通过在返回的订阅上调用Dispose
来删除 - 非常 Rx-ish。这是扩展方法:
我用这个测试了代码:
它产生了:
这看起来是正确的。如果这能为您解决问题,请告诉我。
I've got two approaches.
The first provides a way to chain together the predicate/action pairs to "syphon" off values that match. It follows the Rx operator style.
I can write this:
If I have this extension method:
It doesn't allow you to add and remove predicate/action pairs on the fly, but it is a fairly simple operator that might be useful.
To have the full add/remove functionality I have come up with this approach:
The
.Syphon(out add)
extension method overload allows the method to effectively return two results - the normal return value is theIObservable<T>
and the second comes out as aFunc<Func<T, bool>, Action<T>, IDisposable>
. This second return value allows new predicate/action pairs to be added to the syphon operator and then removed by callingDispose
on the returned subscription - very Rx-ish.Here's the extension method:
I tested the code with this:
And it produced:
And that seemed right. Let me know if this solves it for you.
一种选择是让您的订阅者也可以被观察到。因此,这些订阅者所做的是,如果他们不处理该值,那么他们会通过其可观察接口发出该值,然后最后一个订阅者(处理所有未使用的值)将是订阅每个可观察接口的单个对象其他订户的。像这样的东西:
One option is to make your subscribers to be observable as well. So what these subscribers does is if that they don't handle the value then they emit it through their observable interface and then the last subscriber (that handle all not used values) will be a single ton object that subscribes to each of the observable interface of the other subscribers. Something like:
我不知道有什么开箱即用的方法可以做到这一点,但我会按照下面的方式进行
I don't know of any out of the box way to do this but I would do it as under