如何并行执行多个异步调用?

发布于 2024-12-12 03:47:00 字数 808 浏览 0 评论 0原文

我有许多命令可以调用 SOAP Web 服务(Betfair API)。所有都是经典的异步编程模型类型...

public void DoXXX( <input parameters ...> )
{
    XXXRequest Request = new XXXRequest();
    // populate Request from input parameters ...
    BetfairService.BeginXXX( Request, XXXCallback, State );
}

private void XXXCallback(IAsyncResult Result)
{
    XXXResponse Response = BetfairService.EndXXX(Result);
    if (Response.ErrorCode == XXXErrorCode.OK)
        // store data from Response
    else
        // deal with error
}

我想执行一组指定的命令,然后在所有命令完成后使用组合的返回数据值进行一些计算。

我可以按顺序执行此操作,方法是创建一个命令队列,并让每个回调方法在完成后触发队列中的下一个命令,并将计算作为队列中的最后一项。然而,这是相对缓慢的。

我理想的解决方案是让所有这些命令并行运行,然后在所有命令完成后触发计算。我尝试查看 Task.Factory.FromAsync(),但我能找到的所有示例仅包括对 BeginXXX / EndXXX 的直接调用,而不对响应执行任何操作。

有谁能提供解决此问题的合适解决方案吗?

I have a number of commands that make calls to a soap web service (Betfair API). All are of the classic asynchronous programming model type ...

public void DoXXX( <input parameters ...> )
{
    XXXRequest Request = new XXXRequest();
    // populate Request from input parameters ...
    BetfairService.BeginXXX( Request, XXXCallback, State );
}

private void XXXCallback(IAsyncResult Result)
{
    XXXResponse Response = BetfairService.EndXXX(Result);
    if (Response.ErrorCode == XXXErrorCode.OK)
        // store data from Response
    else
        // deal with error
}

I want to execute a specified set of commands, and then do some calculations using the combined returned data values, once all of commands are completed.

I'm able to do this as a sequence, by making a queue of commands and having each callback method trigger the next command in the queue once it's complete, with the calculation as the last item in the queue. This is relatively slow however.

My ideal solution would be to have all of these commands running in parallel and then to have the calculation triggered once all of the commands are completed. I've tried looking at Task.Factory.FromAsync(), but all of the examples I can find only include direct calls to BeginXXX / EndXXX, not doing anything with the response.

Does anyone have any pointers for a suitable solution to this problem?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

胡渣熟男 2024-12-19 03:47:01

要使用 FromAsync,您需要指定返回类型:

var task = Task<XXXResponse>.Factory.FromAsync( ...

然后您的任务具有 Result 类型为 XXXResponse 的属性。

然后,您可以使用 Parallel.Invoke 并行运行初始命令。这将阻塞,直到所有这些任务完成。然后你就可以进行“附加处理”。

或者,您可以将初始任务存储在数组中,并使用 Task.Factory.ContinueWhenAll 创建延续。

缺口

To use FromAsync, you need to specify the return type:

var task = Task<XXXResponse>.Factory.FromAsync( ...

You then have a task with a Result property of type XXXResponse.

You could then use Parallel.Invoke to run the initial commands in parallel. This will block until all those tasks complete. Then you can do your "additional processing".

Or you could store the initial tasks in an array and use Task.Factory.ContinueWhenAll to create a continuation.

Nick

一个人的旅程 2024-12-19 03:47:01

我建议您查看 Microsoft 的 Reactive Extensions (Rx) 来完成您想要的操作。它允许您将异步操作(除其他外)转换为可观察的 LINQ 查询。

假设我有这三个函数,每个函数都需要大量时间来计算:

Func<int> fa = () =>
{
    Thread.Sleep(2000);
    return 42;
};

Func<int, string, string> fb = (n, t) =>
{
    Thread.Sleep(n * 1000);
    return t + n.ToString();
};

Func<DateTimeOffset> fc = () =>
{
    Thread.Sleep(1000);
    return DateTimeOffset.UtcNow;
};

然后我可以使用 FromAsyncPattern 方法将这些 lambda 函数转换为可观察函数:

Func<IObservable<int>> ofa =
    Observable
        .FromAsyncPattern<int>(
            fa.BeginInvoke,
            fa.EndInvoke);

Func<int, string, IObservable<string>> ofb =
    Observable
        .FromAsyncPattern<int, string, string>(
            fb.BeginInvoke,
            fb.EndInvoke);

Func<IObservable<DateTimeOffset>> ofc =
    Observable
        .FromAsyncPattern<DateTimeOffset>(
            fc.BeginInvoke,
            fc.EndInvoke);

现在我可以通过简单地执行以下操作来启动所有调用:

IObservable<int> oa = ofa();
IObservable<string> ob = ofb(1, "foo");
IObservable<DateTimeOffset> oc = ofc();

这有效地并行启动了三个计算。现在我们只需要将结果汇总在一起即可。

这就是 LINQ 的用武之地:

var query =
    from a in oa
    from b in ob
    from c in oc
    select new { a, b, c };

然后我订阅这个查询来获取结果:

query.Subscribe(p =>
{
    Console.WriteLine(p.a);
    Console.WriteLine(p.b);
    Console.WriteLine(p.c);
});

在我的测试中,我在这段代码周围放置了计时器来计算实际的执行时间。尽管如果串行运行,总时间应为 4 秒,但此代码会在 2 秒内完成——这三个时间中任何一个的最长时间。

现在这个例子只是 Rx 功能的一小部分,但它是一个很好的起点。

如果我能进一步解释什么,请大声喊出来。

以下是 Rx 的链接:

I would suggest that you look at Microsoft's Reactive Extensions (Rx) to do what you want. It lets you turn async operations (among other things) into observable LINQ queries.

Say I have these three functions that each take significant time to compute:

Func<int> fa = () =>
{
    Thread.Sleep(2000);
    return 42;
};

Func<int, string, string> fb = (n, t) =>
{
    Thread.Sleep(n * 1000);
    return t + n.ToString();
};

Func<DateTimeOffset> fc = () =>
{
    Thread.Sleep(1000);
    return DateTimeOffset.UtcNow;
};

I can then use the FromAsyncPattern method to turn these lambda functions into observable functions:

Func<IObservable<int>> ofa =
    Observable
        .FromAsyncPattern<int>(
            fa.BeginInvoke,
            fa.EndInvoke);

Func<int, string, IObservable<string>> ofb =
    Observable
        .FromAsyncPattern<int, string, string>(
            fb.BeginInvoke,
            fb.EndInvoke);

Func<IObservable<DateTimeOffset>> ofc =
    Observable
        .FromAsyncPattern<DateTimeOffset>(
            fc.BeginInvoke,
            fc.EndInvoke);

Now I can start all of the calls by simply doing this:

IObservable<int> oa = ofa();
IObservable<string> ob = ofb(1, "foo");
IObservable<DateTimeOffset> oc = ofc();

That effectively kicks off the three computations in parallel. Now we just have to bring the results together.

This is where LINQ comes in:

var query =
    from a in oa
    from b in ob
    from c in oc
    select new { a, b, c };

And I then subscribe to this query to get the results:

query.Subscribe(p =>
{
    Console.WriteLine(p.a);
    Console.WriteLine(p.b);
    Console.WriteLine(p.c);
});

In my testing I put timers around this code to compute actual execution time. Even though the total time should be 4 seconds if run in series this code finishes in 2 - the maximum time of any of the three.

Now this example is only a little facet of what Rx can do, but it is a good starting point.

Yell out if I can explain anything further.

Here are the links for Rx:

风尘浪孓 2024-12-19 03:47:01

您应该有一个已执行服务调用的计数器。在每个回调方法中,您应该检查此计数器 - 如果它等于服务调用的最大数量,您应该进行额外的处理,否则 - 您只需增加计数器。

You should have a counter of the executed service calls. In each callback method you should check this counter - if it equals the max number of service calls, you should do your additional processing, otherwise - you just increment the counter.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文