在同一 IObservable 订阅内访问 IObservable

发布于 2024-12-09 00:49:16 字数 980 浏览 5 评论 0原文

这是我尝试使用响应式扩展执行的简单示例,但它不起作用

Add 在这个简单示例中不起作用

    public static void Main(string[] args)
    {
        var list = new List<int> { 1, 2, 3 };
        var obs = list.ToObservable();
        IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
        {
            Console.WriteLine(p.ToString());
            Console.WriteLine(Add(obs).ToString());
        },
        err => Console.WriteLine("Error"),
        () => Console.WriteLine("Sequence Completed")
        );
        Console.ReadLine();
        subscription.Dispose();
    }

    private static int Add(IObservable<int> wholeList)
    {
        int sum = 0;
        wholeList.ForEach(i => sum = sum + i);
        return sum;
    }

实际输出

1
_

所需输出

1
6
2
6
3
6
Sequence Completed
_

即我想执行方法 Add(obs )在每次迭代中,其中 obs 本身就是正在进行迭代的冷 IObservable?

Here is a bare-bones example of what I'm trying to do with Reactive Extensions, but it does not work

Add does not work in this simple example

    public static void Main(string[] args)
    {
        var list = new List<int> { 1, 2, 3 };
        var obs = list.ToObservable();
        IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
        {
            Console.WriteLine(p.ToString());
            Console.WriteLine(Add(obs).ToString());
        },
        err => Console.WriteLine("Error"),
        () => Console.WriteLine("Sequence Completed")
        );
        Console.ReadLine();
        subscription.Dispose();
    }

    private static int Add(IObservable<int> wholeList)
    {
        int sum = 0;
        wholeList.ForEach(i => sum = sum + i);
        return sum;
    }

Actual Output

1
_

Desired Output

1
6
2
6
3
6
Sequence Completed
_

i.e. i would like to execute a method Add(obs) inside each iteration, where obs is itself the cold IObservable undergoing the iteration?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

橘和柠 2024-12-16 00:49:16

将其更改

IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread)

为:

IDisposable subscription = obs.ObserveOn(Scheduler.NewThread)

您应该注意到,就 Rx 而言,您正在做一件坏事。你正在进出可观察对象。您应该尽可能避免这种情况。

因此,例如,避免这种情况:

    var list = new List<int> { 1, 2, 3 };
    var obs = list.ToObservable();

当这是相同的时:

    var obs = Observable.Range(1, 3);

整个 static int Add(IObservableWholeList) 方法也很糟糕。它调用 ForEach (这通常应该是一个警告,表明您做错了什么)从可观察的值中取出值。这就是可能发生死锁的地方。

已经有一个名为 Sum 的可观察扩展,它返回一个 IObservble 并且这不会让您脱离可观察。

因此,尝试像这样编写代码:

var obs = Observable.Range(1, 3);

var query =
    from n in obs
    from s in obs.Sum()
    select new
    {
        Number = n.ToString(),
        Sum = s.ToString(),
    };

using (var subscription = query.SubscribeOn(Scheduler.NewThread).Subscribe(
    x =>
        {
            Console.WriteLine(x.Number);
            Console.WriteLine(x.Sum);
        },
    err =>
        Console.WriteLine("Error"),
    () =>
        Console.WriteLine("Sequence Completed")))
{
    Console.ReadLine();
}

我希望这会有所帮助。

Change this:

IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread)

to this:

IDisposable subscription = obs.ObserveOn(Scheduler.NewThread)

You should note that you're doing a bad thing as far as Rx is concerned. You are moving in and out of the observables. You should avoid this where ever possible.

So, for example, avoid this:

    var list = new List<int> { 1, 2, 3 };
    var obs = list.ToObservable();

when this is the same:

    var obs = Observable.Range(1, 3);

Also the whole static int Add(IObservable<int> wholeList) method is bad. It calls ForEach (which generally should be a warning that you're doing something wrong) to take the values out of an observable. This is where dead-locking can occur.

There already is an observable extension called Sum which returns an IObservble<int> and this doesn't take you out of the observable.

So try writing your code like this:

var obs = Observable.Range(1, 3);

var query =
    from n in obs
    from s in obs.Sum()
    select new
    {
        Number = n.ToString(),
        Sum = s.ToString(),
    };

using (var subscription = query.SubscribeOn(Scheduler.NewThread).Subscribe(
    x =>
        {
            Console.WriteLine(x.Number);
            Console.WriteLine(x.Sum);
        },
    err =>
        Console.WriteLine("Error"),
    () =>
        Console.WriteLine("Sequence Completed")))
{
    Console.ReadLine();
}

I hope this helps.

听风念你 2024-12-16 00:49:16

根据您的评论,我建议您根据需要创建可观察的项目来生成项目,而不是在订阅后执行此操作。在您的示例中,您可以执行以下操作:

var list = new List<int> { 1, 2, 3 };
var obs = list.ToObservable().Select(i => new Tuple<int,IObservable<int>>(i,list.ToObservable()));

obs.SubscribeOn(Scheduler.NewThread).Subscribe(t => {
  Console.WriteLine(t.Item1);
  SaveItems(t.Item2);
});

As par your comment I would suggest that you make the observable to generate the items as required and not do this stuff after subscription. In you example you can do something like:

var list = new List<int> { 1, 2, 3 };
var obs = list.ToObservable().Select(i => new Tuple<int,IObservable<int>>(i,list.ToObservable()));

obs.SubscribeOn(Scheduler.NewThread).Subscribe(t => {
  Console.WriteLine(t.Item1);
  SaveItems(t.Item2);
});
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文