如何为 foreach 创建并行预取

发布于 2024-10-28 01:18:16 字数 318 浏览 7 评论 0原文

考虑到在 C#、TPL、并行扩展、异步 CTP、响应式扩展中执行异步操作的众多新方法,我想知道并行化以下内容的获取和处理部分的最简单方法是什么:

foreach(string url in urls)
{
   var file = FetchFile(url);
   ProcessFile(file);
}

但条件是,虽然文件可以随时获取 ProcessFile 一次只能处理一个文件,应按顺序调用。

简而言之,让 FetchFile 和 ProcessFile 以管道方式运行(即同时发生)的最简单方法是什么?

Given the numerous new ways of performing asynchronous operations in C#, TPL, Parallel Extensions, Async CTP, Reactive Extensions I was wonder what the simplest way to parallelize the fetching and processing portions of the following would be:

foreach(string url in urls)
{
   var file = FetchFile(url);
   ProcessFile(file);
}

The proviso is that whilst files can be fetched at anytime ProcessFile can only handle one file at a time and should be called sequentially.

In short what is the simplest way to get FetchFile and ProcessFile to behave in a pipelined way i.e. happen concurrently?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

烧了回忆取暖 2024-11-04 01:18:16

这是 RX 方式。此扩展会将 uri 的流转换为流的流:

    public static IObservable<Stream> RequestToStream(this IObservable<string> source, 
    TimeSpan timeout)
    {
        return
            from wc in source.Select(WebRequest.Create)
            from s in Observable
                .FromAsyncPattern<WebResponse>(wc.BeginGetResponse,
                    wc.EndGetResponse)()
                .Timeout(timeout, Observable.Empty<WebResponse>())
                .Catch(Observable.Empty<WebResponse>())
            select s.GetResponseStream();
    }

用法:

new [] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
   .ToObservable()
   .RequestToStream(TimeSpan.FromSeconds(5))
   .Do(stream = > ProcessStream(stream))
   .Subscribe();

编辑:哎呀,还没有注意到文件写入序列化要求。这部分可以通过使用 .Concat 来完成,它本质上是一个 RX 队列(另一个是 .Zip)。

让我们有一个 .StreamToFile 扩展名:

    public static IObservable<Unit> StreamToFile(this Tuple<Stream, string> source)
    {
        return Observable.Defer(() =>
            source.Item1.AsyncRead().WriteTo(File.Create(source.Item2)));
    }

现在您可以并行 Web 请求,但可以序列化来自它们的文件写入:

        new[] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
            .ToObservable()
            .RequestToStream(TimeSpan.FromSeconds(5))
            .Select((stream, i) => Tuple.Create(stream, i.ToString() + ".dat"))
            .Select(x => x.StreamToFile())
            .Concat()
            .Subscribe();

Here's RX way. This extension will transform a steam of uri's into a stream of streams:

    public static IObservable<Stream> RequestToStream(this IObservable<string> source, 
    TimeSpan timeout)
    {
        return
            from wc in source.Select(WebRequest.Create)
            from s in Observable
                .FromAsyncPattern<WebResponse>(wc.BeginGetResponse,
                    wc.EndGetResponse)()
                .Timeout(timeout, Observable.Empty<WebResponse>())
                .Catch(Observable.Empty<WebResponse>())
            select s.GetResponseStream();
    }

Usage:

new [] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
   .ToObservable()
   .RequestToStream(TimeSpan.FromSeconds(5))
   .Do(stream = > ProcessStream(stream))
   .Subscribe();

Edit: oops, haven't noticed the file write serialization requirement. This part can be done by employing .Concat which is essentially an RX queue (another one is .Zip)

Let's have a .StreamToFile extension:

    public static IObservable<Unit> StreamToFile(this Tuple<Stream, string> source)
    {
        return Observable.Defer(() =>
            source.Item1.AsyncRead().WriteTo(File.Create(source.Item2)));
    }

now you can have web requests parallel but serialize file writing that comes from them:

        new[] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
            .ToObservable()
            .RequestToStream(TimeSpan.FromSeconds(5))
            .Select((stream, i) => Tuple.Create(stream, i.ToString() + ".dat"))
            .Select(x => x.StreamToFile())
            .Concat()
            .Subscribe();
看海 2024-11-04 01:18:16

考虑到 ProcessFile 的约束,我认为您应该使用 TPL 异步获取数据,然后将引用预加载数据的令牌加入队列。然后,您可以有一个后台线程从队列中取出项目并将它们一一交给 ProcessFile。这是一个生产者/消费者模式。

对于队列,您可以查看 BlockingCollection 它可以提供线程安全队列还具有能够限制工作负载的良好效果。

Given the constraint on ProcessFile I would say you should fetch the data asynchronously using TPL and then enqueue a token which references the preloaded data. You can then have a background thread that pulls items off the queue and hands them to the ProcessFile one by one. This is a producer/consumer pattern.

For the queue you can take a look at BlockingCollection which can provide a threadsafe queue which also has the nice effect of being able to throttle the workload.

谜兔 2024-11-04 01:18:16

由于我不知道所有花哨的机制,我可能会用老式的方式来做,尽管我怀疑它会被归类为“简单”:

var q = new Queue<MyFile>();
var ev = new ManualResetEvent(false);

new System.Threading.Thread(() =>
{
    while ( true )
    {
        ev.WaitOne();
        MyFile item;
        lock (q)
        {
            item = q.Dequeue();
            if ( q.Count == 0 )
                ev.Reset();
        }
        if ( item == null )
            break;
        ProcessFile(item);
    }
}).Start();
foreach(string url in urls)
{
    var file = FetchFile(url);
    lock (q)
    {
        q.Enqueue(file);
        ev.Set();
    }
}
lock (q)
{
    q.Enqueue(null);
    ev.Set();
}

Since I don't know all the fancy mechanisms, I'd probably do it in the old fashion way, although I doubt it would classify as "simple":

var q = new Queue<MyFile>();
var ev = new ManualResetEvent(false);

new System.Threading.Thread(() =>
{
    while ( true )
    {
        ev.WaitOne();
        MyFile item;
        lock (q)
        {
            item = q.Dequeue();
            if ( q.Count == 0 )
                ev.Reset();
        }
        if ( item == null )
            break;
        ProcessFile(item);
    }
}).Start();
foreach(string url in urls)
{
    var file = FetchFile(url);
    lock (q)
    {
        q.Enqueue(file);
        ev.Set();
    }
}
lock (q)
{
    q.Enqueue(null);
    ev.Set();
}
郁金香雨 2024-11-04 01:18:16

异步实际上并不表示并行。它只是意味着您不会阻塞等待另一个操作。但是,您可以利用异步 I/O 在下载 URL 时不阻塞线程,即,如果您这样做,则不需要与 url 一样多的线程来并行下载它们:

var client = new WebClient();
var syncLock = new object();
TaskEx.WhenAll(urls.Select(url => {
  client.DownloadDataTaskAsync(url).ContinueWith((t) => {
    lock(syncLock) {
      ProcessFile(t.Result);
    }
  });
}));

基本上,我们为每个 url 创建一个异步下载任务然后,当任何任务完成时,我们调用一个使用普通对象作为输出同步锁的延续,以确保 ProcessFile 顺序发生。在最后一个 ProcessFile 延续完成之前,WhenAll 不会返回。

您可以使用 RX 的 ReplaySubject 来避免显式锁定(但当然它会在内部锁定):

var pipeline = new ReplaySubject<byte[]>();
var files = pipeline.ToEnumerable();
var client = new WebClient();
TaskEx.WhenAll(urls
        .Select(download => client.DownloadDataTaskAsync((string) download)
            .ContinueWith(t => pipeline.OnNext(t.Result))
        )
    ).ContinueWith(task => pipeline.OnCompleted(task));
foreach(var file in files) {
    ProcessFile(file);
}

这里我们使用 ReplaySubject 作为文件下载的管道。每次下载都是异步完成的,并将其结果发布到 foreach 阻塞的管道中(即顺序发生)。当所有任务完成后,我们就完成了可观察对象,该可观察对象退出 foreach

Asynchronous does not actually denote parallel. It simply means that your will not block waiting for another operation. But you you take advantage of async I/O to not block threads as you download the URLs, i.e. you don't need as many threads as urls to download them in parallel if you do this:

var client = new WebClient();
var syncLock = new object();
TaskEx.WhenAll(urls.Select(url => {
  client.DownloadDataTaskAsync(url).ContinueWith((t) => {
    lock(syncLock) {
      ProcessFile(t.Result);
    }
  });
}));

Basically we create a async download task per url and then as any task completes, we invoke a continuation that uses a plain object as out synclock for making sure ProcessFile happens sequentially. WhenAll won't return until the last ProcessFile continuation is done.

You could avoid the explicit lock with RX's ReplaySubject (but of course it will lock internally):

var pipeline = new ReplaySubject<byte[]>();
var files = pipeline.ToEnumerable();
var client = new WebClient();
TaskEx.WhenAll(urls
        .Select(download => client.DownloadDataTaskAsync((string) download)
            .ContinueWith(t => pipeline.OnNext(t.Result))
        )
    ).ContinueWith(task => pipeline.OnCompleted(task));
foreach(var file in files) {
    ProcessFile(file);
}

Here we use a ReplaySubject as our pipeline of file downloads. Each download finishes asynchronously and publishes its results into the pipeline which the foreach blocks on (i.e. happens sequentially). When all tasks complete, we complete the observable, which exits the foreach.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文