如何在多线程场景中使用集合来加速例程

发布于 2024-10-18 08:08:53 字数 2511 浏览 2 评论 0原文

我有一个利用并行化来处理数据的应用程序。

主程序是用 C# 编写的,而分析数据的例程之一是在外部 C++ dll 上。该库扫描数据并在每次在数据中发现特定信号时调用回调。数据应该被收集、分类然后存储到HD中。

这是我对回调调用的方法以及排序和存储数据的方法的第一个简单实现:

// collection where saving found signals
List<MySignal> mySignalList = new List<MySignal>();

// method invoked by the callback
private void Collect(int type, long time)
{
    lock(locker) { mySignalList.Add(new MySignal(type, time)); }
}

// store signals to disk
private void Store()
{
    // sort the signals
    mySignalList.Sort();
    // file is a object that manages the writing of data to a FileStream
    file.Write(mySignalList.ToArray());
}

数据由大小为 10000 xn 的二维数组(short[][] data)组成,其中包含 n 变量。我以这种方式使用并行化:

Parallel.For(0, 10000, (int i) =>
{
    // wrapper for the external c++ dll
    ProcessData(data[i]);
}

现在,对于 10000 个数组中的每一个,我估计可以触发 0 到 4 个回调。我面临着瓶颈,并且考虑到我的 CPU 资源没有被过度利用,我认为锁(以及数千个回调)是问题所在(我是对的还是可能有其他问题?)。我已经尝试过 ConcurrentBag 集合,但性能仍然较差(与其他用户一致 调查结果)。

我认为使用无锁代码的一个可能的解决方案是拥有多个集合。那么就需要一种策略来使并行进程的每个线程在单个集合上工作。例如,集合可以位于以线程 ID 作为键的字典内,但我不知道任何 .NET 工具可以实现此目的(我应该知道用于在启动并行化之前初始化字典的线程 ID)。这个想法是否可行?如果可行的话,是否存在一些用于此目的的 .NET 工具?或者,还有其他想法可以加快这一过程吗?

[编辑] 我遵循了Reed Copsey的建议,使用了以下解决方案(根据VS2010的分析器,之前锁定和添加到列表的负担占用了15%的资源,而现在只有1%):

// master collection where saving found signals
List<MySignal> mySignalList = new List<MySignal>();
// thread-local storage of data (each thread is working on its List<MySignal>)
ThreadLocal<List<MySignal>> threadLocal;

// analyze data
private void AnalizeData()
{
    using(threadLocal = new ThreadLocal<List<MySignal>>(() => 
        { return new List<MySignal>(); }))
    {
        Parallel.For<int>(0, 10000,
        () =>
        { return 0;},
        (i, loopState, localState) =>
        {
            // wrapper for the external c++ dll
            ProcessData(data[i]);
            return 0;
        },
        (localState) =>
        {
            lock(this)
            {
                // add thread-local lists to the master collection
                mySignalList.AddRange(local.Value);
                local.Value.Clear();
            }
        });
    }
}

// method invoked by the callback
private void Collect(int type, long time)
{
    local.Value.Add(new MySignal(type, time));
}

I've an application that makes use of parallelization for processing data.

The main program is in C#, while one of the routine for analyzing data is on an external C++ dll. This library scans data and calls a callback everytime a certain signal is found within the data. Data should be collected, sorted and then stored into HD.

Here is my first simple implementation of the method invoked by the callback and of the method for sorting and storing data:

// collection where saving found signals
List<MySignal> mySignalList = new List<MySignal>();

// method invoked by the callback
private void Collect(int type, long time)
{
    lock(locker) { mySignalList.Add(new MySignal(type, time)); }
}

// store signals to disk
private void Store()
{
    // sort the signals
    mySignalList.Sort();
    // file is a object that manages the writing of data to a FileStream
    file.Write(mySignalList.ToArray());
}

Data is made up of a bidimensional array (short[][] data) of size 10000 x n, with n variable. I use parallelization in this way:

Parallel.For(0, 10000, (int i) =>
{
    // wrapper for the external c++ dll
    ProcessData(data[i]);
}

Now for each of the 10000 arrays I estimate that 0 to 4 callbacks could be fired. I'm facing a bottleneck and given that my CPU resources are not over-utilized, I suppose that the lock (together with thousand of callbacks) is the problem (am I right or there could be something else?). I've tried the ConcurrentBag collection but performances are still worse (in line with other user findings).

I thought that a possible solution for use lock-free code would be to have multiple collections. Then it would be necessary a strategy to make each thread of the parallel process working on a single collection. Collections could be for instance inside a dictionary with thread ID as key, but I do not know any .NET facility for this (I should know the threads ID for initialize the dictionary before launching the parallelization). Could be this idea feasible and, in case yes, does exist some .NET tool for this? Or alternatively, any other idea to speed up the process?

[EDIT]
I've followed the Reed Copsey's suggestion and I used the following solution (according to the profiler of VS2010, before the burden for locking and adding to the list was taking 15% of the resources, while now only 1%):

// master collection where saving found signals
List<MySignal> mySignalList = new List<MySignal>();
// thread-local storage of data (each thread is working on its List<MySignal>)
ThreadLocal<List<MySignal>> threadLocal;

// analyze data
private void AnalizeData()
{
    using(threadLocal = new ThreadLocal<List<MySignal>>(() => 
        { return new List<MySignal>(); }))
    {
        Parallel.For<int>(0, 10000,
        () =>
        { return 0;},
        (i, loopState, localState) =>
        {
            // wrapper for the external c++ dll
            ProcessData(data[i]);
            return 0;
        },
        (localState) =>
        {
            lock(this)
            {
                // add thread-local lists to the master collection
                mySignalList.AddRange(local.Value);
                local.Value.Clear();
            }
        });
    }
}

// method invoked by the callback
private void Collect(int type, long time)
{
    local.Value.Add(new MySignal(type, time));
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

善良天后 2024-10-25 08:08:53

认为使用无锁代码的可能解决方案是拥有多个集合。那么就需要一种策略来使并行进程的每个线程在单个集合上工作。例如,集合可以位于以线程 ID 作为键的字典内,但我不知道任何 .NET 工具可以实现此目的(我应该知道用于在启动并行化之前初始化字典的线程 ID)。这个想法是否可行?如果可行的话,是否存在一些用于此目的的 .NET 工具?或者,还有其他加快进程的想法吗?

您可能想看看使用 ThreadLocal< /a> 保存您的收藏。这会自动为每个线程分配一个单独的集合。

话虽如此,Parallel.For 的重载可以与本地状态一起使用,并在最后有一个集合传递。这可能允许您生成 ProcessData 包装器,其中每个循环体都在其自己的集合上工作,然后在最后重新组合。这可能会消除在重组阶段之前锁定的需要(因为每个线程都在处理自己的数据集),重组阶段每个线程发生一次(而不是每个任务发生一次,即:10000 次)。这可以将您所获取的锁数量从 ~25000 (0-4*10000) 减少到几个(取决于系统和算法,但在四核系统上,根据我的经验,可能约为 10 个)。

有关详细信息,请参阅我的博客文章 使用 Parallel.For/ForEach 聚合数据。它演示了重载并更详细地解释了它们如何工作。

thought that a possible solution for use lock-free code would be to have multiple collections. Then it would be necessary a strategy to make each thread of the parallel process working on a single collection. Collections could be for instance inside a dictionary with thread ID as key, but I do not know any .NET facility for this (I should know the threads ID for initialize the dictionary before launching the parallelization). Could be this idea feasible and, in case yes, does exist some .NET tool for this? Or alternatively, any other idea to speed up the process?

You might want to look at using ThreadLocal<T> to hold your collections. This automatically allocates a separate collection per thread.

That being said, there are overloads of Parallel.For which work with local state, and have a collection pass at the end. This, potentially, would allow you to spawn your ProcessData wrapper, where each loop body was working on its own collection, and then recombine at the end. This would, potentially, eliminate the need for locking (since each thread is working on it's own data set) until the recombination phase, which happens once per thread (instead of once per task,ie: 10000 times). This could reduce the number of locks you're taking from ~25000 (0-4*10000) down to a few (system and algorithm dependent, but on a quad core system, probably around 10 in my experience).

For details, see my blog post on aggregating data with Parallel.For/ForEach. It demonstrates the overloads and explains how they work in more detail.

等往事风中吹 2024-10-25 08:08:53

你没有说你遇到了多少“瓶颈”。但让我们看看锁。

在我的机器(四核,2.4 GHz)上,如果没有争用,锁定的时间约为 70 纳秒。我不知道将一个项目添加到列表中需要多长时间,但我无法想象它需要超过几微秒。但是,考虑到锁争用,我们需要 100 微秒(我会非常惊讶地发现它甚至是 10 微秒)来将一个项目添加到列表中。因此,如果您要向列表中添加 40,000 个项目,则需要 4,000,000 微秒或 4 秒。如果是这种情况,我预计会有一个核心被固定。

我没有使用过 ConcurrentBag,但我发现了 BlockingCollection 非常好。

不过,我怀疑你的瓶颈在其他地方。你做过任何分析吗?

You don't say how much of a "bottleneck" you're encountering. But let's look at the locks.

On my machine (quad core, 2.4 GHz), a lock costs about 70 nanoseconds if it's not contended. I don't know how long it takes to add an item to a list, but I can't imagine that it takes more than a few microseconds. But let's it takes 100 microseconds (I would be very surprised to find that it's even 10 microseconds) to add an item to the list, taking into account lock contention. So if you're adding 40,000 items to the list, that's 4,000,000 microseconds, or 4 seconds. And I would expect one core to be pegged if this were the case.

I haven't used ConcurrentBag, but I've found the performance of BlockingCollection to be very good.

I suspect, though, that your bottleneck is somewhere else. Have you done any profiling?

千笙结 2024-10-25 08:08:53

C# 中的基本集合不是线程安全的。

您遇到的问题是由于您锁定整个集合只是为了调用 add() 方法。

您可以创建一个线程安全的集合,该集合仅锁定集合内的单个元素,而不是整个集合。

例如,让我们看一下链接列表

实现一个 add(item (or list))) 方法,该方法执行以下操作:

  1. 锁定集合。
  2. A = 获取最后一项。
  3. 将最后一项引用设置为新项(或新列表中的最后一项)。
  4. 锁定最后一项 (A)。
  5. 解锁集合。
  6. 的末尾
  7. 将新项目/列表添加到 A.解锁锁定项目

。这将在添加时锁定整个集合,仅执行 3 个简单任务。

然后,当迭代列表时,只需对每个对象执行 trylock() 即可。如果已锁定,请等待锁定释放(这样您就可以确定 add() 已完成)。
在 C# 中,您可以在对象上执行一个空的 lock() 块作为 trylock()
因此,现在您可以安全地添加,同时仍然可以迭代列表。

如果需要,可以对其他命令实施类似的解决方案。

The basic collections in C# aren't thread safe.

The problem you're having is due to the fact that you're locking the entire collection just to call an add() method.

You could create a thread-safe collection that only locks single elements inside the collection, instead of the whole collection.

Lets look at a linked list for example.

Implement an add(item (or list)) method that does the following:

  1. Lock collection.
  2. A = get last item.
  3. set last item reference to the new item (or last item in new list).
  4. lock last item (A).
  5. unclock collection.
  6. add new items/list to the end of A.
  7. unlock locked item.

This will lock the whole collection for just 3 simple tasks when adding.

Then when iterating over the list, just do a trylock() on each object. if it's locked, wait for the lock to be free (that way you're sure that the add() finished).
In C# you can do an empty lock() block on the object as a trylock().
So now you can add safely and still iterate over the list at the same time.

Similar solutions can be implemented for the other commands if needed.

最美的太阳 2024-10-25 08:08:53

任何内置的集合解决方案都将涉及一些锁定。可能有一些方法可以避免它,也许可以通过隔离正在读/写的实际数据结构,但是您必须在某个地方锁定。

另外,请了解 Parallel.For() 将使用线程池。虽然实现起来很简单,但您会失去对线程创建/销毁的细粒度控制,并且在启动大型并行任务时,线程池会产生一些严重的开销。

从概念的角度来看,我会同时尝试两件事来加速该算法:

  • 使用 Thread 类自己创建线程。这使您免受线程池调度速度减慢的影响;当您告诉线程启动时,线程就会开始处理(或等待 CPU 时间),而不是线程池按照自己的节奏将线程请求输入到其内部工作中。您应该了解同时运行的线程数;经验法则是,当活动线程数量是可用于执行线程的“执行单元”数量的两倍以上时,多线程的好处就会被开销所抵消。但是,您应该能够构建一个相对简单地考虑到这一点的系统。
  • 通过创建结果集合的字典来隔离结果集合。每个结果集合都以执行处理的线程携带的某个标记为关键字,并传递给回调。字典可以一次读取多个元素而无需锁定,并且由于每个线程都写入字典中的不同集合,因此不需要锁定这些列表(即使您确实锁定了它们,您也不会阻塞其他线程)。结果是,当向主字典添加新线程的新集合时,唯一必须锁定以阻止线​​程的集合是主字典。如果您善于回收代币,这种情况就不会经常发生。

Any built-in solution for a collection is going to involve some locking. There may be ways to avoid it, perhaps by segregating the actual data constructs being read/written, but you're going to have to lock SOMEWHERE.

Also, understand that Parallel.For() will use the thread pool. While simple to implement, you lose fine-grained control over creation/destruction of threads, and the thread pool involves some serious overhead when starting up a big parallel task.

From a conceptual standpoint, I would try two things in tandem to speed up this algorithm:

  • Create threads yourself, using the Thread class. This frees you from the scheduling slowdowns of the thread pool; a thread starts processing (or waiting for CPU time) when you tell it to start, instead of the thread pool feeding requests for threads into its internal workings at its own pace. You should be aware of the number of threads you have going at once; the rule of thumb is that the benefits of multithreading are overcome by the overhead when you have more than twice the number of active threads as "execution units" available to execute threads. However, you should be able to architect a system that takes this into account relatively simply.
  • Segregate the collection of results, by creating a dictionary of collections of results. Each results collection is keyed to some token carried by the thread doing the processing and passed to the callback. The dictionary can have multiple elements READ at one time without locking, and as each thread is WRITING to a different collection within the Dictionary there shouldn't be a need to lock those lists (and even if you did lock them you wouldn't be blocking other threads). The result is that the only collection that has to be locked such that it would block threads is the main dictionary, when a new collection for a new thread is added to it. That shouldn't have to happen often if you're smart about recycling tokens.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文