如何实现“处理剩余”使用反应式扩展的观察者

发布于 2024-12-10 17:38:56 字数 590 浏览 0 评论 0 原文

我有一个 IObservable 和几个根据某些条件处理字符串的观察者:

observable.Subscribe(s => { if (s.StartsWith("a")) {...} });
observable.Subscribe(s => { if (s.StartsWith("b")) {...} });
observable.Subscribe(s => { if (s.StartsWith("c")) {...} });
observable.Subscribe(s => { if (s.StartsWith("d")) {...} });
....

这是一个简化的示例(条件更复杂并且观察到的事件不是字符串),但您明白了。

我想要一个 IObserver 来捕获任何其他观察者未处理的所有字符串。可以随时添加具有不同条件的观察者(即:StartsWith("e")),并且条件集不会重叠。

这种情况是否得到某种支持?或者,一旦所有其他观察者都尝试过,我是否必须将观察到的字符串标记为已处理并订阅未处理的字符串(以及如何实现)?

I have an IObservable<string> and several observers that handle strings based on some condition:

observable.Subscribe(s => { if (s.StartsWith("a")) {...} });
observable.Subscribe(s => { if (s.StartsWith("b")) {...} });
observable.Subscribe(s => { if (s.StartsWith("c")) {...} });
observable.Subscribe(s => { if (s.StartsWith("d")) {...} });
....

This is a simplified example (the condition is more complex and the observed events aren't strings) but you get the idea.

I'd like to have an IObserver<string> that catches all strings that are not handled by any other observer. Observers with different conditions (i.e.: StartsWith("e")) can be added at any time and the set of conditions does not overlap.

Is this scenario somehow supported? Or do I have to mark observed strings as handled and subscribe to unhandled strings once all other observers have tried (and how do I implement that)?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

緦唸λ蓇 2024-12-17 17:38:56

我有两种方法。

第一个提供了一种将谓词/动作对链接在一起以“虹吸”匹配的值的方法。它遵循 Rx 操作员风格。

我可以这样写:

observable
    .Syphon(s => s.StartsWith("a"), s => { })
    .Syphon(s => s.StartsWith("b"), s => { })
    .Syphon(s => s.StartsWith("c"), s => { })
    .Syphon(s => s.StartsWith("d"), s => { })
    .Subscribe(s => { /* otherwise */ });

如果我有这个扩展方法:

public static IObservable<T> Syphon<T>(
    this IObservable<T> source,
    Func<T, bool> predicate,
    Action<T> action)
{
    if (source == null) throw new ArgumentNullException("source");
    if (predicate == null) throw new ArgumentNullException("predicate");
    if (action == null) throw new ArgumentNullException("action");
    return Observable.Create<T>(o =>
        source.Subscribe(
            t =>
            {
                if (predicate(t))
                {
                    action(t);
                }
                else
                {
                    o.OnNext(t);
                }
            },
            ex =>
                o.OnError(ex),
            () =>
                o.OnCompleted()));
}

它不允许您动态添加和删除谓词/操作对,但它是一个相当简单的运算符,可能很有用。

为了拥有完整的添加/删除功能,我想出了这种方法:

Func<Func<string, bool>, Action<string>, IDisposable> add;

observable
    .Syphon(out add)
    .Subscribe(s => { /* otherwise */ });

var startsWithA = add(s => s.StartsWith("a"), s => { /* a */ });
var startsWithB = add(s => s.StartsWith("b"), s => { /* b */ });
startsWithA.Dispose();
var startsWithC = add(s => s.StartsWith("c"), s => { /* c */ });
var startsWithD = add(s => s.StartsWith("d"), s => { /* d */ });
startsWithC.Dispose();
startsWithB.Dispose();
startsWithD.Dispose();

.Syphon(out add) 扩展方法重载允许该方法有效地返回两个结果 - 正常返回值为 IObservable,第二个结果为 Func, Action, IDisposable>。第二个返回值允许将新的谓词/操作对添加到虹吸运算符,然后通过在返回的订阅上调用 Dispose 来删除 - 非常 Rx-ish。

这是扩展方法:

public static IObservable<T> Syphon<T>(
    this IObservable<T> source,
    out Func<Func<T, bool>, Action<T>, IDisposable> subscriber)
{
    if (source == null) throw new ArgumentNullException("source");

    var pas = new List<Tuple<Func<T, bool>, Action<T>>>();

    subscriber = (p, a) =>
    {
        lock (pas)
        {
            var tuple = Tuple.Create(p, a);
            pas.Add(tuple);
            return Disposable.Create(() =>
            {
                lock (pas)
                {
                    pas.Remove(tuple);
                }
            });
        }
    };

    return Observable.Create<T>(o =>
        source.Subscribe(
            t =>
            {
                Action<T> a = null;
                lock (pas)
                {
                    var pa = pas.FirstOrDefault(x => x.Item1(t));
                    if (pa != null)
                    {
                        a = pa.Item2;
                    }
                }
                if (a != null)
                {
                    a(t);
                }
                else
                {
                    o.OnNext(t);
                }
            },
            ex =>
                o.OnError(ex),
            () =>
                o.OnCompleted()));
}

我用这个测试了代码:

var xs = Observable.Interval(TimeSpan.FromSeconds(0.2));

Func<Func<long, bool>, Action<long>, IDisposable> subscriber;
xs
    .Syphon(out subscriber)
    .Subscribe(x => Console.WriteLine(x));

var divBy3 = subscriber(
    x => x % 3 == 0,
    x => Console.WriteLine("divBy3"));

Thread.Sleep(2000);

var divBy2 = subscriber(
    x => x % 2 == 0,
    x => Console.WriteLine("divBy2"));

Thread.Sleep(2000);
divBy3.Dispose();
Thread.Sleep(2000);
divBy2.Dispose();
Thread.Sleep(10000);

它产生了:

divBy3
1
2
divBy3
4
5
divBy3
7
8
divBy3
divBy2
11
divBy3
13
divBy2
divBy3
divBy2
17
divBy3
19
divBy2
21
divBy2
23
divBy2
25
divBy2
27
divBy2
29
30
31
32
...

这看起来是正确的。如果这能为您解决问题,请告诉我。

I've got two approaches.

The first provides a way to chain together the predicate/action pairs to "syphon" off values that match. It follows the Rx operator style.

I can write this:

observable
    .Syphon(s => s.StartsWith("a"), s => { })
    .Syphon(s => s.StartsWith("b"), s => { })
    .Syphon(s => s.StartsWith("c"), s => { })
    .Syphon(s => s.StartsWith("d"), s => { })
    .Subscribe(s => { /* otherwise */ });

If I have this extension method:

public static IObservable<T> Syphon<T>(
    this IObservable<T> source,
    Func<T, bool> predicate,
    Action<T> action)
{
    if (source == null) throw new ArgumentNullException("source");
    if (predicate == null) throw new ArgumentNullException("predicate");
    if (action == null) throw new ArgumentNullException("action");
    return Observable.Create<T>(o =>
        source.Subscribe(
            t =>
            {
                if (predicate(t))
                {
                    action(t);
                }
                else
                {
                    o.OnNext(t);
                }
            },
            ex =>
                o.OnError(ex),
            () =>
                o.OnCompleted()));
}

It doesn't allow you to add and remove predicate/action pairs on the fly, but it is a fairly simple operator that might be useful.

To have the full add/remove functionality I have come up with this approach:

Func<Func<string, bool>, Action<string>, IDisposable> add;

observable
    .Syphon(out add)
    .Subscribe(s => { /* otherwise */ });

var startsWithA = add(s => s.StartsWith("a"), s => { /* a */ });
var startsWithB = add(s => s.StartsWith("b"), s => { /* b */ });
startsWithA.Dispose();
var startsWithC = add(s => s.StartsWith("c"), s => { /* c */ });
var startsWithD = add(s => s.StartsWith("d"), s => { /* d */ });
startsWithC.Dispose();
startsWithB.Dispose();
startsWithD.Dispose();

The .Syphon(out add) extension method overload allows the method to effectively return two results - the normal return value is the IObservable<T> and the second comes out as a Func<Func<T, bool>, Action<T>, IDisposable>. This second return value allows new predicate/action pairs to be added to the syphon operator and then removed by calling Dispose on the returned subscription - very Rx-ish.

Here's the extension method:

public static IObservable<T> Syphon<T>(
    this IObservable<T> source,
    out Func<Func<T, bool>, Action<T>, IDisposable> subscriber)
{
    if (source == null) throw new ArgumentNullException("source");

    var pas = new List<Tuple<Func<T, bool>, Action<T>>>();

    subscriber = (p, a) =>
    {
        lock (pas)
        {
            var tuple = Tuple.Create(p, a);
            pas.Add(tuple);
            return Disposable.Create(() =>
            {
                lock (pas)
                {
                    pas.Remove(tuple);
                }
            });
        }
    };

    return Observable.Create<T>(o =>
        source.Subscribe(
            t =>
            {
                Action<T> a = null;
                lock (pas)
                {
                    var pa = pas.FirstOrDefault(x => x.Item1(t));
                    if (pa != null)
                    {
                        a = pa.Item2;
                    }
                }
                if (a != null)
                {
                    a(t);
                }
                else
                {
                    o.OnNext(t);
                }
            },
            ex =>
                o.OnError(ex),
            () =>
                o.OnCompleted()));
}

I tested the code with this:

var xs = Observable.Interval(TimeSpan.FromSeconds(0.2));

Func<Func<long, bool>, Action<long>, IDisposable> subscriber;
xs
    .Syphon(out subscriber)
    .Subscribe(x => Console.WriteLine(x));

var divBy3 = subscriber(
    x => x % 3 == 0,
    x => Console.WriteLine("divBy3"));

Thread.Sleep(2000);

var divBy2 = subscriber(
    x => x % 2 == 0,
    x => Console.WriteLine("divBy2"));

Thread.Sleep(2000);
divBy3.Dispose();
Thread.Sleep(2000);
divBy2.Dispose();
Thread.Sleep(10000);

And it produced:

divBy3
1
2
divBy3
4
5
divBy3
7
8
divBy3
divBy2
11
divBy3
13
divBy2
divBy3
divBy2
17
divBy3
19
divBy2
21
divBy2
23
divBy2
25
divBy2
27
divBy2
29
30
31
32
...

And that seemed right. Let me know if this solves it for you.

晨与橙与城 2024-12-17 17:38:56

一种选择是让您的订阅者也可以被观察到。因此,这些订阅者所做的是,如果他们不处理该值,那么他们会通过其可观察接口发出该值,然后最后一个订阅者(处理所有未使用的值)将是订阅每个可观察接口的单个​​对象其他订户的。像这样的东西:

public class MyObserver : IObserver<string>, IObservable<string>
{
    Subject<string> s = new Subject<string>();
    public MyObserver(IObserver<string> obs)
    {
        s.Subscribe(obs);
    }
    public void OnCompleted()
    { }
    public void OnError(Exception error)
    { }
    public void OnNext(string value)
    {
        //If condition matches then else dont do on next
        s.OnNext(value);
    }
    public IDisposable Subscribe(IObserver<string> observer)
    {
        return s.Subscribe(observer);
    }
}
public class LastObserver : IObserver<string>
{
    public void OnCompleted()
    {   }

    public void OnError(Exception error)
    { }

    public void OnNext(string value)
    { //Do something with not catched value
    }
}
static LastObserver obs = new LastObserver();
static void Main()
{
    var timer = Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => i.ToString());
    timer.Subscribe(new MyObserver(obs));
    timer.Subscribe(new MyObserver(obs));
    timer.Subscribe(new MyObserver(obs));

} 

One option is to make your subscribers to be observable as well. So what these subscribers does is if that they don't handle the value then they emit it through their observable interface and then the last subscriber (that handle all not used values) will be a single ton object that subscribes to each of the observable interface of the other subscribers. Something like:

public class MyObserver : IObserver<string>, IObservable<string>
{
    Subject<string> s = new Subject<string>();
    public MyObserver(IObserver<string> obs)
    {
        s.Subscribe(obs);
    }
    public void OnCompleted()
    { }
    public void OnError(Exception error)
    { }
    public void OnNext(string value)
    {
        //If condition matches then else dont do on next
        s.OnNext(value);
    }
    public IDisposable Subscribe(IObserver<string> observer)
    {
        return s.Subscribe(observer);
    }
}
public class LastObserver : IObserver<string>
{
    public void OnCompleted()
    {   }

    public void OnError(Exception error)
    { }

    public void OnNext(string value)
    { //Do something with not catched value
    }
}
static LastObserver obs = new LastObserver();
static void Main()
{
    var timer = Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => i.ToString());
    timer.Subscribe(new MyObserver(obs));
    timer.Subscribe(new MyObserver(obs));
    timer.Subscribe(new MyObserver(obs));

} 
天涯离梦残月幽梦 2024-12-17 17:38:56

我不知道有什么开箱即用的方法可以做到这一点,但我会按照下面的方式进行

class ConditionAction
{
     public Predicate<string> Condition {get; set; }
     public Action<string> Action {get; set; }
}

var conditions = new ConditionAction[]{action1, action2, action3};

foreach (var condition in conditions)
       observable.Where(condition.Condition).Subscribe(condition.Action);
.....
observable.Where(s=>!conditions.Any(c=>c.Condition(s))).Subscribe(...);

I don't know of any out of the box way to do this but I would do it as under

class ConditionAction
{
     public Predicate<string> Condition {get; set; }
     public Action<string> Action {get; set; }
}

var conditions = new ConditionAction[]{action1, action2, action3};

foreach (var condition in conditions)
       observable.Where(condition.Condition).Subscribe(condition.Action);
.....
observable.Where(s=>!conditions.Any(c=>c.Condition(s))).Subscribe(...);
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文