当我只能生成其他可观察值时,如何从起始可观察值生成可观察值?

发布于 2024-12-04 00:53:41 字数 1334 浏览 1 评论 0 原文

我想生成一个可观察值,其中可观察值的每个值都依赖于它之前的值,从单个值开始。如果我在像 Func 这样的值之间进行简单的转换,那么使用 Observable.Generate 就很容易做到,如下所示:

Func<int, IObservable<int>> mkInts = init =>
    Observable.Generate(
        init,         // start value
        _ => true,    // continue ?
        i => i + 1,   // transformation function
        i => i);      // result selector

using (mkInts(1).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

这将很高兴在我的屏幕上写下数字直到我按下回车键。但是,我的转换函数执行一些网络 IO,因此类型为 Func>,因此我无法使用该方法。相反,我尝试过这个:

// simulate my transformation function
Func<int, IObservable<int>> mkInt = ts =>
    Observable.Return(ts)
              .Delay(TimeSpan.FromMilliseconds(10));

// pre-assign my generator function, since the function calls itself recursively
Func<int, IObservable<int>> mkInts = null;

// my generator function
mkInts = init =>
{
    var ints = mkInt(init); 

    // here is where I depend on the previous value.
    var nextInts = ints.SelectMany(i => mkInts(i + 1)); 
    return ints.Concat(nextInts);
};

using (mkInts(1).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

但是在打印大约 5000 个数字后,这将发生堆栈溢出。我该如何解决这个问题?

I want to generate an observable where each value of the observable is dependent on the one before it, starting from a single value. If I have a simple transformation between values like Func<int, int>, it is easy to do with Observable.Generate like so:

Func<int, IObservable<int>> mkInts = init =>
    Observable.Generate(
        init,         // start value
        _ => true,    // continue ?
        i => i + 1,   // transformation function
        i => i);      // result selector

using (mkInts(1).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

This will happily write numbers on my screen until I press enter. However, my transformation function does some network IO, so the type is Func<int, IObservable<int>>, so I cannot use that approach. Instead, I have tried this:

// simulate my transformation function
Func<int, IObservable<int>> mkInt = ts =>
    Observable.Return(ts)
              .Delay(TimeSpan.FromMilliseconds(10));

// pre-assign my generator function, since the function calls itself recursively
Func<int, IObservable<int>> mkInts = null;

// my generator function
mkInts = init =>
{
    var ints = mkInt(init); 

    // here is where I depend on the previous value.
    var nextInts = ints.SelectMany(i => mkInts(i + 1)); 
    return ints.Concat(nextInts);
};

using (mkInts(1).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

But this will stackoverflow after printing about 5000 numbers. How can I solve this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

千里故人稀 2024-12-11 00:53:41

我想我已经为您提供了一个很好的干净的解决方案。

首先,回到使用 Func - 它可以使用 Func> 轻松转换为 Func> >Observable.FromAsyncPattern。

我用它来测试:

Func<int, int> mkInt = ts =>
{
    Thread.Sleep(100);
    return ts + 1;
};

现在这是赚钱的地方:

Func<int, Func<int, int>, IObservable<int>> mkInts = (i0, fn) =>
    Observable.Create<int>(o =>
    {
        var ofn = Observable
            .FromAsyncPattern<int, int>(
                fn.BeginInvoke,
                fn.EndInvoke);

        var s = new Subject<int>();

        var q = s.Select(x => ofn(x)).Switch();

        var r = new CompositeDisposable(new IDisposable[]
        {
            q.Subscribe(s),
            s.Subscribe(o),
        });

        s.OnNext(i0);

        return r;
    });

迭代函数变成了异步可观察的。

q 变量将来自主题的值输入到可观察迭代函数中,并选择计算出的可观察值。 Switch 方法使结果变平,并确保对可观察迭代函数的每次调用都得到正确清理。

此外,使用CompositeDisposable 允许将两个订阅作为一个进行处理。非常整洁!

它很容易使用,如下所示:

using (mkInts(7, mkInt).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

现在您有了生成器函数的完全参数化版本。不错吧?

I think I've got a nice clean solution for you.

First-up, go back to using a Func<int, int> - it can easily be turned into a Func<int, IObservable<int>> using Observable.FromAsyncPattern.

I used this for testing:

Func<int, int> mkInt = ts =>
{
    Thread.Sleep(100);
    return ts + 1;
};

Now here's the money maker:

Func<int, Func<int, int>, IObservable<int>> mkInts = (i0, fn) =>
    Observable.Create<int>(o =>
    {
        var ofn = Observable
            .FromAsyncPattern<int, int>(
                fn.BeginInvoke,
                fn.EndInvoke);

        var s = new Subject<int>();

        var q = s.Select(x => ofn(x)).Switch();

        var r = new CompositeDisposable(new IDisposable[]
        {
            q.Subscribe(s),
            s.Subscribe(o),
        });

        s.OnNext(i0);

        return r;
    });

The iterating function is turned into an asynchronous observable.

The q variable feeds the values from the subject into the observable iterating function and selects the calculated observable. The Switch method flattens out the result and ensures that each call to the observable iterating function is properly cleaned up.

Also, the use of a CompositeDisposable allows the two subscriptions to be disposed of as one. Very neat!

It's easily used like this:

using (mkInts(7, mkInt).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

Now you have a fully parametrized version of your generator function. Nice, huh?

心如狂蝶 2024-12-11 00:53:41

我找到 以下答案正确,但有点太复杂了。
我建议的唯一更改是 mkInts 方法:

Func<int, Func<int, int>, IObservable<int>> mkInts = (i0, fn) =>
   {
      var s = new Subject<int>();
      s.ObserveOn(Scheduler.NewThread).Select(fn).Subscribe(s);
      s.OnNext(i0);
      return s;
   };

I find the following answer correct, but a little too complicated.
The only change I suggest is the mkInts method:

Func<int, Func<int, int>, IObservable<int>> mkInts = (i0, fn) =>
   {
      var s = new Subject<int>();
      s.ObserveOn(Scheduler.NewThread).Select(fn).Subscribe(s);
      s.OnNext(i0);
      return s;
   };
不念旧人 2024-12-11 00:53:41

我不完全确定您是否打算将函数的最终结果再次反馈到函数中,或者您是否打算有一个单独的函数来获取下一个输入,所以我两者都做了。这里的技巧是让 IScheduler 完成重复调用的繁重工作。

public Func<T, IObservable<T>> Feedback<T>(Func<T, IObservable<T>> generator, 
                                           IScheduler scheduler)
{
    return seed =>
             Observable.Create((IObserver<T> observer) =>
                 scheduler.Schedule(seed,
                     (current, self) =>
                         generator(current).Subscribe(value => 
                            {
                                observer.OnNext(value);
                                self(value);
                            })));
}

public Func<T, IObservable<T>> GenerateAsync<T>(Func<T, IObservable<T>> generator,
                                                Func<T, T> seedTransform,
                                                IScheduler scheduler)
{
    return seed =>
             Observable.Create((IObserver<T> observer) =>
                 scheduler.Schedule(seed,
                     (current, self) =>
                         generator(current).Subscribe(value =>
                         {
                             observer.OnNext(value);
                             self(seedTransform(current));
                         })));
}

I was not entirely sure if you meant to feed the eventual result of the function back into the function again or if you meant to have a separate function that would get the next input, so I made both. The trick here is to let the IScheduler do the heavy lifting of the repeated calls.

public Func<T, IObservable<T>> Feedback<T>(Func<T, IObservable<T>> generator, 
                                           IScheduler scheduler)
{
    return seed =>
             Observable.Create((IObserver<T> observer) =>
                 scheduler.Schedule(seed,
                     (current, self) =>
                         generator(current).Subscribe(value => 
                            {
                                observer.OnNext(value);
                                self(value);
                            })));
}

public Func<T, IObservable<T>> GenerateAsync<T>(Func<T, IObservable<T>> generator,
                                                Func<T, T> seedTransform,
                                                IScheduler scheduler)
{
    return seed =>
             Observable.Create((IObserver<T> observer) =>
                 scheduler.Schedule(seed,
                     (current, self) =>
                         generator(current).Subscribe(value =>
                         {
                             observer.OnNext(value);
                             self(seedTransform(current));
                         })));
}
简单气质女生网名 2024-12-11 00:53:41

我相信代码不是尾递归,因此会导致异常。下面的代码可以正常工作,没有任何此类异常。

public static IObservable<int> GetObs(int i)
{
   return Observable.Return(i).Delay(TimeSpan.FromMilliseconds(10));
}
public static IObservable<int> MakeInts(int start)
{
   return Observable.Generate(start, _ => true, i => i + 1, i => GetObs(i))
                .SelectMany(obs => obs);
}


using (MakeInts(1).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

或者通过修改您的代码,例如:

Action<int, IObserver<int>> mkInt = (i,obs) =>
               Observable.Return(i)
              .Delay(TimeSpan.FromMilliseconds(10)).Subscribe<int>(ii => obs.OnNext(ii));

            // pre-assign my generator function, since the function calls itself recursively
            Func<int, IObservable<int>> mkInts = null;
            // my generator function
            mkInts = init =>
            {
                var s = new Subject<int>();
                var ret = s.Do(i => {
                    mkInt(i + 1, s);
                });
                mkInt(init, s);
                return ret;
            };

            using (mkInts(1).Subscribe(Console.WriteLine))
            {
                Console.ReadLine();
            }

I believe the code is not tail recursive and hence causes SO exception. Below is the code which works fine without any such exception.

public static IObservable<int> GetObs(int i)
{
   return Observable.Return(i).Delay(TimeSpan.FromMilliseconds(10));
}
public static IObservable<int> MakeInts(int start)
{
   return Observable.Generate(start, _ => true, i => i + 1, i => GetObs(i))
                .SelectMany(obs => obs);
}


using (MakeInts(1).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

Or by modifying your code like:

Action<int, IObserver<int>> mkInt = (i,obs) =>
               Observable.Return(i)
              .Delay(TimeSpan.FromMilliseconds(10)).Subscribe<int>(ii => obs.OnNext(ii));

            // pre-assign my generator function, since the function calls itself recursively
            Func<int, IObservable<int>> mkInts = null;
            // my generator function
            mkInts = init =>
            {
                var s = new Subject<int>();
                var ret = s.Do(i => {
                    mkInt(i + 1, s);
                });
                mkInt(init, s);
                return ret;
            };

            using (mkInts(1).Subscribe(Console.WriteLine))
            {
                Console.ReadLine();
            }
嘿嘿嘿 2024-12-11 00:53:41

我找到了一个解决方案,虽然它可能不是最漂亮的,但可以达到我想要的效果。如果有人有更好的解决方案,我会将其标记为答案。

Func<int, IObservable<int>> mkInt = ts =>
    Observable.Return(ts)
              .Delay(TimeSpan.FromMilliseconds(10));

Func<int, IObservable<int>> mkInts = init =>
{
    Subject<int> subject = new Subject<int>();
    IDisposable sub = null;
    Action<int> onNext = null;
    onNext = i =>
    {
        subject.OnNext(i);
        sub.Dispose();
        sub = mkInt(i + 1).Subscribe(onNext);
    };
    sub = mkInt(init).Subscribe(onNext);
    return subject;
};

using (mkInts(1).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

I found a solution, which, although it may not be the prettiest, does what I want it to. If anyone has a better solution, I will mark that as an answer.

Func<int, IObservable<int>> mkInt = ts =>
    Observable.Return(ts)
              .Delay(TimeSpan.FromMilliseconds(10));

Func<int, IObservable<int>> mkInts = init =>
{
    Subject<int> subject = new Subject<int>();
    IDisposable sub = null;
    Action<int> onNext = null;
    onNext = i =>
    {
        subject.OnNext(i);
        sub.Dispose();
        sub = mkInt(i + 1).Subscribe(onNext);
    };
    sub = mkInt(init).Subscribe(onNext);
    return subject;
};

using (mkInts(1).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文