Parallel.Foreach 产生太多线程

发布于 2024-08-17 11:21:27 字数 2041 浏览 10 评论 0原文

问题

虽然我在这里讨论的代码是用 F# 编写的,但它是基于 .NET 4 框架的,并不具体依赖于 F# 的任何特殊性(至少看起来是这样!)。

我的磁盘上有一些数据,我应该从网络更新,将最新版本保存到磁盘:

type MyData =
    { field1 : int;
      field2 : float }

type MyDataGroup =
    { Data : MyData[];
      Id : int }

// load : int -> MyDataGroup
let load dataId =
    let data = ... // reads from disk
    { Data = data;
      Id = dataId }

// update : MyDataGroup -> MyDataGroup
let update dg =
    let newData = ... // reads from the network and process
                      // newData : MyData[]

    { dg with Data = dg.Data
                     |> Seq.ofArray
                     |> Seq.append newData
                     |> processDataSomehow
                     |> Seq.toArray }

// save : MyDataGroup -> unit
let save dg = ... // writes to the disk

let loadAndSaveAndUpdate = load >> update >> save

问题是要loadAndSaveAndUpdate我的所有数据,我必须执行该函数<很多次:

{1 .. 5000} |> loadAndSaveAndUpdate

每个步骤都会执行

  • 一些磁盘 IO、
  • 一些数据处理、
  • 一些网络 IO(可能存在大量延迟)、
  • 更多数据处理
  • 和一些磁盘 IO。

在某种程度上,并行完成这件事不是很好吗?不幸的是,我的阅读和解析功能都不是“异步工作流程就绪”的。

我提出的第一个(不是很好)解决

方案 任务

我做的第一件事是设置一个 Task[] 并启动它们:

let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
            |> Seq.map createTask
            |> Seq.toArray

tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)

然后我按 CTRL+ESC 只是为了看看如何它使用了很多线程。 15, 17, ..., 35, ..., 170, ... 直到杀死应用程序!出了点问题。

并行

我做了几乎相同的事情,但是使用 Parallel.ForEach(...) ,结果是相同的:很多很多很多线程。

一个可行的解决方案......有点然后

我决定只启动 n 个线程,Task.WaitAll(其中),然后是其他 n ,直到没有更多任务可用。

这是可行的,但问题是,当它完成处理时,比如说,n-1 个任务,它会等待,等待,等待该死的最后一个任务,由于大量的网络延迟而坚持阻塞。这可不好啊!

那么,您将如何解决这个问题?我很乐意查看不同的解决方案,包括异步工作流程(以及在本例中如何调整我的非异步函数)、并行扩展、奇怪的并行模式等。

谢谢。

The problem

Although the code about which I will talk here I wrote in F#, it is based on the .NET 4 framework, not specifically depending on any particularity of F# (at least it seems so!).

I have some pieces of data on my disk that I should update from the network, saving the latest version to the disk:

type MyData =
    { field1 : int;
      field2 : float }

type MyDataGroup =
    { Data : MyData[];
      Id : int }

// load : int -> MyDataGroup
let load dataId =
    let data = ... // reads from disk
    { Data = data;
      Id = dataId }

// update : MyDataGroup -> MyDataGroup
let update dg =
    let newData = ... // reads from the network and process
                      // newData : MyData[]

    { dg with Data = dg.Data
                     |> Seq.ofArray
                     |> Seq.append newData
                     |> processDataSomehow
                     |> Seq.toArray }

// save : MyDataGroup -> unit
let save dg = ... // writes to the disk

let loadAndSaveAndUpdate = load >> update >> save

The problem is that to loadAndSaveAndUpdate all my data, I would have to execute the function many times:

{1 .. 5000} |> loadAndSaveAndUpdate

Each step would do

  • some disk IO,
  • some data crunching,
  • some network IO (with possibility of lots of latency),
  • more data crunching,
  • and some disk IO.

Wouldn't it be nice to have this done in parallel, to some degree? Unfortunately, none of my reading and parsing functions are "async-workflows-ready".

The first (not very good) solutions I came up with

Tasks

The first thing I've done was to set up a Task[] and start them all:

let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
            |> Seq.map createTask
            |> Seq.toArray

tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)

Then I hit CTRL+ESC just to see how many threads it was using. 15, 17, ..., 35, ..., 170, ... until killed the application! Something was going wrong.

Parallel

I did almost the same thing but using Parallel.ForEach(...) and the results were the same: lots and lots and lots of threads.

A solution that works... kind of

Then I decided to start only n threads, Task.WaitAll(of them), then other n, until there were no more tasks available.

This works, but the problem is that when it has finished processing, say, n-1 tasks, it will wait, wait, wait for the damn last Task that insist on blocking due to lots of network latency. This is not good!

So, how would you attack this problem? I'd appreciate to view different solutions, involving either Async Workflows (and in this case how to adapt my non-async functions), Parallel Extensions, weird parallel patterns, etc.

Thanks.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

杀手六號 2024-08-24 11:21:27

ParallelOptions.MaxDegreeOfParallelism 限制通过并行方法调用运行的并发操作

ParallelOptions.MaxDegreeOfParallelism limits the number of concurrent operations run by Parallel method calls

且行且努力 2024-08-24 11:21:27

使用“异步”将使您能够在各种 I/O 调用处于“海上”状态时执行 I/O 密集型工作,而无需消耗线程,因此这是我的第一个建议。将代码转换为异步应该很简单,通常是

  • 将每个函数体包装在 async{...} 中,在必要时添加 return
  • 创建异步版本通过 Async.FromBeginEnd 将任何 I/O 原语尚未存在于库中
  • let r = Foo() 形式的调用切换为 let! r = AsyncFoo()
  • 使用 Async.Parallel 将 5000 个异步对象转换为并行运行的单个异步对象。

有各种教程可以实现此目的; 此处就是这样一个网络广播。

Using 'async's will enable you to do the I/O-bound work without burning threads while the various I/O calls are 'at sea', so that would be my first suggestion. It should be straightforward to convert the code to async, usually along the lines of

  • wrap each function body in async{...}, add return where necessary
  • create Async versions of any I/O primitives that aren't already in the library via Async.FromBeginEnd
  • Switch calls of the form let r = Foo() to let! r = AsyncFoo()
  • Use Async.Parallel to convert the 5000 async objects into a single Async that runs in parallel

There are various tutorials for doing this; one such webcast is here.

玩世 2024-08-24 11:21:27

您确定您的个人任务能够及时完成吗?我相信 Parallel.ForEach 和 Task 类都已经使用了 .NET 线程池。任务通常应该是短暂的工作项,在这种情况下,线程池只会产生少量的实际线程,但是如果您的任务没有取得进展并且有其他任务排队,那么使用的线程数将稳步增加到最大值(默认为 250/processor< /a> 在.NET 2.0 SP1中,但在不同版本的框架下有所不同)。还值得注意的是(至少在 .NET 2.0 SP1 中)新线程的创建被限制为每秒 2 个新线程,因此达到您所看到的线程数表明任务无法在短时间内完成。时间(因此将责任归咎于 Parallel.ForEach 可能并不完全准确)。

我认为 Brian 关于使用异步工作流程的建议是一个很好的建议,特别是如果长期任务的来源是 IO,因为异步会将您的线程返回到线程池直到IO完成。另一种选择是简单地接受您的任务无法快速完成的事实,并允许生成许多线程(可以通过使用 System.Threading.ThreadPool.SetMaxThreads 在某种程度上进行控制) - 取决于您的任务在这种情况下,使用大量线程可能没什么大不了的。

Are you sure that your individual tasks are completing in a timely manner? I believe that both Parallel.ForEach and the Task class already use the .NET threadpool. Tasks should generally be short-lived work items, in which case the threadpool will only spawn a small number of actual threads, but if your tasks are not making progress and there are other tasks queued then the number of threads used will steadily increase up to the maximum (which by default is 250/processor in .NET 2.0 SP1, but is different under different versions of the framework). It's also worth noting that (at least in .NET 2.0 SP1) new thread creation is throttled to 2 new threads per second, so getting up to the number of threads you're seeing indicates that the tasks are not completing in a short amount of time (so it may not be completely accurate to pin the blame on Parallel.ForEach).

I think that Brian's suggestion to use async workflows is a good one, particularly if the source of the long-lived tasks is IO, since async will return your threads to the threadpool until the IO completes. Another option is to simply accept that your tasks aren't completing quickly and allow the spawning of many threads (which can be controlled to some extent by using System.Threading.ThreadPool.SetMaxThreads) - depending on your situation it may not be a big deal that you're using a lot of threads.

滥情空心 2024-08-24 11:21:27

您始终可以使用ThreadPool

http://msdn.microsoft.com/en-us/ Library/system.threading.threadpool.aspx

基本上:

  1. 创建一个线程池
  2. 设置最大线程数
  3. 使用QueueUserWorkItem(WaitCallback)对所有任务进行排队

You could always use a ThreadPool.

http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx

basically:

  1. Create a thread pool
  2. Set the max number of threads
  3. Queue all the tasks using QueueUserWorkItem(WaitCallback)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文