如何为 foreach 创建并行预取
考虑到在 C#、TPL、并行扩展、异步 CTP、响应式扩展中执行异步操作的众多新方法,我想知道并行化以下内容的获取和处理部分的最简单方法是什么:
foreach(string url in urls)
{
var file = FetchFile(url);
ProcessFile(file);
}
但条件是,虽然文件可以随时获取 ProcessFile
一次只能处理一个文件,应按顺序调用。
简而言之,让 FetchFile 和 ProcessFile 以管道方式运行(即同时发生)的最简单方法是什么?
Given the numerous new ways of performing asynchronous operations in C#, TPL, Parallel Extensions, Async CTP, Reactive Extensions I was wonder what the simplest way to parallelize the fetching and processing portions of the following would be:
foreach(string url in urls)
{
var file = FetchFile(url);
ProcessFile(file);
}
The proviso is that whilst files can be fetched at anytime ProcessFile
can only handle one file at a time and should be called sequentially.
In short what is the simplest way to get FetchFile
and ProcessFile
to behave in a pipelined way i.e. happen concurrently?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
这是 RX 方式。此扩展会将 uri 的流转换为流的流:
用法:
编辑:哎呀,还没有注意到文件写入序列化要求。这部分可以通过使用 .Concat 来完成,它本质上是一个 RX 队列(另一个是 .Zip)。
让我们有一个 .StreamToFile 扩展名:
现在您可以并行 Web 请求,但可以序列化来自它们的文件写入:
Here's RX way. This extension will transform a steam of uri's into a stream of streams:
Usage:
Edit: oops, haven't noticed the file write serialization requirement. This part can be done by employing .Concat which is essentially an RX queue (another one is .Zip)
Let's have a .StreamToFile extension:
now you can have web requests parallel but serialize file writing that comes from them:
考虑到 ProcessFile 的约束,我认为您应该使用 TPL 异步获取数据,然后将引用预加载数据的令牌加入队列。然后,您可以有一个后台线程从队列中取出项目并将它们一一交给 ProcessFile。这是一个生产者/消费者模式。
对于队列,您可以查看 BlockingCollection 它可以提供线程安全队列还具有能够限制工作负载的良好效果。
Given the constraint on
ProcessFile
I would say you should fetch the data asynchronously using TPL and then enqueue a token which references the preloaded data. You can then have a background thread that pulls items off the queue and hands them to the ProcessFile one by one. This is a producer/consumer pattern.For the queue you can take a look at BlockingCollection which can provide a threadsafe queue which also has the nice effect of being able to throttle the workload.
由于我不知道所有花哨的机制,我可能会用老式的方式来做,尽管我怀疑它会被归类为“简单”:
Since I don't know all the fancy mechanisms, I'd probably do it in the old fashion way, although I doubt it would classify as "simple":
异步实际上并不表示并行。它只是意味着您不会阻塞等待另一个操作。但是,您可以利用异步 I/O 在下载 URL 时不阻塞线程,即,如果您这样做,则不需要与 url 一样多的线程来并行下载它们:
基本上,我们为每个 url 创建一个异步下载任务然后,当任何任务完成时,我们调用一个使用普通对象作为输出同步锁的延续,以确保 ProcessFile 顺序发生。在最后一个
ProcessFile
延续完成之前,WhenAll
不会返回。您可以使用 RX 的
ReplaySubject
来避免显式锁定(但当然它会在内部锁定):这里我们使用
ReplaySubject
作为文件下载的管道。每次下载都是异步完成的,并将其结果发布到 foreach 阻塞的管道中(即顺序发生)。当所有任务完成后,我们就完成了可观察对象,该可观察对象退出foreach
。Asynchronous does not actually denote parallel. It simply means that your will not block waiting for another operation. But you you take advantage of async I/O to not block threads as you download the URLs, i.e. you don't need as many threads as urls to download them in parallel if you do this:
Basically we create a async download task per url and then as any task completes, we invoke a continuation that uses a plain object as out synclock for making sure
ProcessFile
happens sequentially.WhenAll
won't return until the lastProcessFile
continuation is done.You could avoid the explicit lock with RX's
ReplaySubject
(but of course it will lock internally):Here we use a
ReplaySubject
as our pipeline of file downloads. Each download finishes asynchronously and publishes its results into the pipeline which theforeach
blocks on (i.e. happens sequentially). When all tasks complete, we complete the observable, which exits theforeach
.