如何使用 List<> 阻止 ToObservable?

发布于 2024-10-18 04:51:21 字数 1314 浏览 3 评论 0 原文

我第一次尝试 RX,有几个问题。

1)有没有更好的方法来完成我的集合的异步?

2)我需要阻塞线程直到所有异步任务完成,我该怎么做?

class Program
{

    internal class MyClass
    {
        private readonly List<int> _myData = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

        private readonly Random random = new Random();

        public int DoSomething(int j)
        {
            int i = random.Next(j * 1000) - (j * 200);
            i = i < 0 ? 1000 : i;
            Thread.Sleep(i);
            Console.WriteLine(j);
            return j;
        }

        public IObservable<int> DoSomethingAsync(int j)
        {
            return Observable.CreateWithDisposable<int>(
                o => Observable.ToAsync<int, int>(DoSomething)(j).Subscribe(o)
                );
        }

        public void CreateTasks()
        {
            _myData.ToObservable(Scheduler.NewThread).Subscribe(
            onNext: (i) => DoSomethingAsync(i).Subscribe(),
            onCompleted: () => Console.WriteLine("Completed")
                );
        }
    }

    static void Main(string[] args)
    {
        MyClass test = new MyClass();

        test.CreateTasks();

        Console.ReadKey(); 
    } 
}

(注意:我知道我可以使用 Observable.Range 作为我的 Int 列表,但我的列表在实际程序中不是 Int 类型)。

I am trying on RX for the first time and I have a couple questions.

1) Is there a better way to accomplish the Async of my collection?

2) I need to block on the thread until all Async Tasks are complete, how do I do that?

class Program
{

    internal class MyClass
    {
        private readonly List<int> _myData = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

        private readonly Random random = new Random();

        public int DoSomething(int j)
        {
            int i = random.Next(j * 1000) - (j * 200);
            i = i < 0 ? 1000 : i;
            Thread.Sleep(i);
            Console.WriteLine(j);
            return j;
        }

        public IObservable<int> DoSomethingAsync(int j)
        {
            return Observable.CreateWithDisposable<int>(
                o => Observable.ToAsync<int, int>(DoSomething)(j).Subscribe(o)
                );
        }

        public void CreateTasks()
        {
            _myData.ToObservable(Scheduler.NewThread).Subscribe(
            onNext: (i) => DoSomethingAsync(i).Subscribe(),
            onCompleted: () => Console.WriteLine("Completed")
                );
        }
    }

    static void Main(string[] args)
    {
        MyClass test = new MyClass();

        test.CreateTasks();

        Console.ReadKey(); 
    } 
}

(Note: I know I could have used Observable.Range for my list of Int but my list is not of type Int in the real program).

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

烂柯人 2024-10-25 04:51:21

我可能会尝试

public void CreateTasks()                
{                       
    _myData.ToObservable(Scheduler.NewThread)
        .SelectMany(i => Observable.Start(() => DoSomething(i)))
        .Subscribe(j => Console.WriteLine("j is {0}", j), 
                  () => Console.WriteLine("Completed"));       
} 

所以首先我更改了 DoSomethingAsync 以便它使用 Observable.Start。 Observable.Start 将异步运行 DoSomething 方法,并在该方法完成时通过 IObservable.OnNext 返回值。

然后,CreateTasks 方法像以前一样对集合中的每个项目运行,但将每个值提供给 SelectMany ,然后继续调用 DoSomethingAsync 方法。结果是,您将在每次完成对 DoSomethingAsync 的调用时收到一个 OnNext 消息,并在它们全部完成时收到一个 OnComplete 消息。

I'd probably try

public void CreateTasks()                
{                       
    _myData.ToObservable(Scheduler.NewThread)
        .SelectMany(i => Observable.Start(() => DoSomething(i)))
        .Subscribe(j => Console.WriteLine("j is {0}", j), 
                  () => Console.WriteLine("Completed"));       
} 

So firstly I've changed the DoSomethingAsync so that it uses Observable.Start. Observable.Start will run the DoSomething method asyncronously and return the value through IObservable.OnNext when the method completes.

Then the CreateTasks method runs over each item in the collection as it did before, but feeds each value into a SelectMany which continues with a call the DoSomethingAsync method. The result is, that you'll then recieve an OnNext for each completed call to DoSomethingAsync and an OnComplete when they are all complete.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文