如何在多线程场景中使用集合来加速例程
我有一个利用并行化来处理数据的应用程序。
主程序是用 C# 编写的,而分析数据的例程之一是在外部 C++ dll 上。该库扫描数据并在每次在数据中发现特定信号时调用回调。数据应该被收集、分类然后存储到HD中。
这是我对回调调用的方法以及排序和存储数据的方法的第一个简单实现:
// collection where saving found signals
List<MySignal> mySignalList = new List<MySignal>();
// method invoked by the callback
private void Collect(int type, long time)
{
lock(locker) { mySignalList.Add(new MySignal(type, time)); }
}
// store signals to disk
private void Store()
{
// sort the signals
mySignalList.Sort();
// file is a object that manages the writing of data to a FileStream
file.Write(mySignalList.ToArray());
}
数据由大小为 10000 xn 的二维数组(short[][] data)组成,其中包含 n 变量。我以这种方式使用并行化:
Parallel.For(0, 10000, (int i) =>
{
// wrapper for the external c++ dll
ProcessData(data[i]);
}
现在,对于 10000 个数组中的每一个,我估计可以触发 0 到 4 个回调。我面临着瓶颈,并且考虑到我的 CPU 资源没有被过度利用,我认为锁(以及数千个回调)是问题所在(我是对的还是可能有其他问题?)。我已经尝试过 ConcurrentBag 集合,但性能仍然较差(与其他用户一致 调查结果)。
我认为使用无锁代码的一个可能的解决方案是拥有多个集合。那么就需要一种策略来使并行进程的每个线程在单个集合上工作。例如,集合可以位于以线程 ID 作为键的字典内,但我不知道任何 .NET 工具可以实现此目的(我应该知道用于在启动并行化之前初始化字典的线程 ID)。这个想法是否可行?如果可行的话,是否存在一些用于此目的的 .NET 工具?或者,还有其他想法可以加快这一过程吗?
[编辑] 我遵循了Reed Copsey的建议,使用了以下解决方案(根据VS2010的分析器,之前锁定和添加到列表的负担占用了15%的资源,而现在只有1%):
// master collection where saving found signals
List<MySignal> mySignalList = new List<MySignal>();
// thread-local storage of data (each thread is working on its List<MySignal>)
ThreadLocal<List<MySignal>> threadLocal;
// analyze data
private void AnalizeData()
{
using(threadLocal = new ThreadLocal<List<MySignal>>(() =>
{ return new List<MySignal>(); }))
{
Parallel.For<int>(0, 10000,
() =>
{ return 0;},
(i, loopState, localState) =>
{
// wrapper for the external c++ dll
ProcessData(data[i]);
return 0;
},
(localState) =>
{
lock(this)
{
// add thread-local lists to the master collection
mySignalList.AddRange(local.Value);
local.Value.Clear();
}
});
}
}
// method invoked by the callback
private void Collect(int type, long time)
{
local.Value.Add(new MySignal(type, time));
}
I've an application that makes use of parallelization for processing data.
The main program is in C#, while one of the routine for analyzing data is on an external C++ dll. This library scans data and calls a callback everytime a certain signal is found within the data. Data should be collected, sorted and then stored into HD.
Here is my first simple implementation of the method invoked by the callback and of the method for sorting and storing data:
// collection where saving found signals
List<MySignal> mySignalList = new List<MySignal>();
// method invoked by the callback
private void Collect(int type, long time)
{
lock(locker) { mySignalList.Add(new MySignal(type, time)); }
}
// store signals to disk
private void Store()
{
// sort the signals
mySignalList.Sort();
// file is a object that manages the writing of data to a FileStream
file.Write(mySignalList.ToArray());
}
Data is made up of a bidimensional array (short[][] data) of size 10000 x n, with n variable. I use parallelization in this way:
Parallel.For(0, 10000, (int i) =>
{
// wrapper for the external c++ dll
ProcessData(data[i]);
}
Now for each of the 10000 arrays I estimate that 0 to 4 callbacks could be fired. I'm facing a bottleneck and given that my CPU resources are not over-utilized, I suppose that the lock (together with thousand of callbacks) is the problem (am I right or there could be something else?). I've tried the ConcurrentBag collection but performances are still worse (in line with other user findings).
I thought that a possible solution for use lock-free code would be to have multiple collections. Then it would be necessary a strategy to make each thread of the parallel process working on a single collection. Collections could be for instance inside a dictionary with thread ID as key, but I do not know any .NET facility for this (I should know the threads ID for initialize the dictionary before launching the parallelization). Could be this idea feasible and, in case yes, does exist some .NET tool for this? Or alternatively, any other idea to speed up the process?
[EDIT]
I've followed the Reed Copsey's suggestion and I used the following solution (according to the profiler of VS2010, before the burden for locking and adding to the list was taking 15% of the resources, while now only 1%):
// master collection where saving found signals
List<MySignal> mySignalList = new List<MySignal>();
// thread-local storage of data (each thread is working on its List<MySignal>)
ThreadLocal<List<MySignal>> threadLocal;
// analyze data
private void AnalizeData()
{
using(threadLocal = new ThreadLocal<List<MySignal>>(() =>
{ return new List<MySignal>(); }))
{
Parallel.For<int>(0, 10000,
() =>
{ return 0;},
(i, loopState, localState) =>
{
// wrapper for the external c++ dll
ProcessData(data[i]);
return 0;
},
(localState) =>
{
lock(this)
{
// add thread-local lists to the master collection
mySignalList.AddRange(local.Value);
local.Value.Clear();
}
});
}
}
// method invoked by the callback
private void Collect(int type, long time)
{
local.Value.Add(new MySignal(type, time));
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
您可能想看看使用
ThreadLocal
< /a> 保存您的收藏。这会自动为每个线程分配一个单独的集合。话虽如此,Parallel.For 的重载可以与本地状态一起使用,并在最后有一个集合传递。这可能允许您生成 ProcessData 包装器,其中每个循环体都在其自己的集合上工作,然后在最后重新组合。这可能会消除在重组阶段之前锁定的需要(因为每个线程都在处理自己的数据集),重组阶段每个线程发生一次(而不是每个任务发生一次,即:10000 次)。这可以将您所获取的锁数量从 ~25000 (0-4*10000) 减少到几个(取决于系统和算法,但在四核系统上,根据我的经验,可能约为 10 个)。
有关详细信息,请参阅我的博客文章 使用 Parallel.For/ForEach 聚合数据。它演示了重载并更详细地解释了它们如何工作。
You might want to look at using
ThreadLocal<T>
to hold your collections. This automatically allocates a separate collection per thread.That being said, there are overloads of
Parallel.For
which work with local state, and have a collection pass at the end. This, potentially, would allow you to spawn yourProcessData
wrapper, where each loop body was working on its own collection, and then recombine at the end. This would, potentially, eliminate the need for locking (since each thread is working on it's own data set) until the recombination phase, which happens once per thread (instead of once per task,ie: 10000 times). This could reduce the number of locks you're taking from ~25000 (0-4*10000) down to a few (system and algorithm dependent, but on a quad core system, probably around 10 in my experience).For details, see my blog post on aggregating data with Parallel.For/ForEach. It demonstrates the overloads and explains how they work in more detail.
你没有说你遇到了多少“瓶颈”。但让我们看看锁。
在我的机器(四核,2.4 GHz)上,如果没有争用,锁定的时间约为 70 纳秒。我不知道将一个项目添加到列表中需要多长时间,但我无法想象它需要超过几微秒。但是,考虑到锁争用,我们需要 100 微秒(我会非常惊讶地发现它甚至是 10 微秒)来将一个项目添加到列表中。因此,如果您要向列表中添加 40,000 个项目,则需要 4,000,000 微秒或 4 秒。如果是这种情况,我预计会有一个核心被固定。
我没有使用过 ConcurrentBag,但我发现了 BlockingCollection 非常好。
不过,我怀疑你的瓶颈在其他地方。你做过任何分析吗?
You don't say how much of a "bottleneck" you're encountering. But let's look at the locks.
On my machine (quad core, 2.4 GHz), a lock costs about 70 nanoseconds if it's not contended. I don't know how long it takes to add an item to a list, but I can't imagine that it takes more than a few microseconds. But let's it takes 100 microseconds (I would be very surprised to find that it's even 10 microseconds) to add an item to the list, taking into account lock contention. So if you're adding 40,000 items to the list, that's 4,000,000 microseconds, or 4 seconds. And I would expect one core to be pegged if this were the case.
I haven't used
ConcurrentBag
, but I've found the performance of BlockingCollection to be very good.I suspect, though, that your bottleneck is somewhere else. Have you done any profiling?
C# 中的基本集合不是线程安全的。
您遇到的问题是由于您锁定整个集合只是为了调用
add()
方法。您可以创建一个线程安全的集合,该集合仅锁定集合内的单个元素,而不是整个集合。
例如,让我们看一下链接列表。
实现一个
add(item (or list)))
方法,该方法执行以下操作:。这将在添加时锁定整个集合,仅执行 3 个简单任务。
然后,当迭代列表时,只需对每个对象执行
trylock()
即可。如果已锁定,请等待锁定释放(这样您就可以确定add()
已完成)。在 C# 中,您可以在对象上执行一个空的
lock()
块作为trylock()
。因此,现在您可以安全地添加,同时仍然可以迭代列表。
如果需要,可以对其他命令实施类似的解决方案。
The basic collections in C# aren't thread safe.
The problem you're having is due to the fact that you're locking the entire collection just to call an
add()
method.You could create a thread-safe collection that only locks single elements inside the collection, instead of the whole collection.
Lets look at a linked list for example.
Implement an
add(item (or list))
method that does the following:This will lock the whole collection for just 3 simple tasks when adding.
Then when iterating over the list, just do a
trylock()
on each object. if it's locked, wait for the lock to be free (that way you're sure that theadd()
finished).In C# you can do an empty
lock()
block on the object as atrylock()
.So now you can add safely and still iterate over the list at the same time.
Similar solutions can be implemented for the other commands if needed.
任何内置的集合解决方案都将涉及一些锁定。可能有一些方法可以避免它,也许可以通过隔离正在读/写的实际数据结构,但是您必须在某个地方锁定。
另外,请了解 Parallel.For() 将使用线程池。虽然实现起来很简单,但您会失去对线程创建/销毁的细粒度控制,并且在启动大型并行任务时,线程池会产生一些严重的开销。
从概念的角度来看,我会同时尝试两件事来加速该算法:
Any built-in solution for a collection is going to involve some locking. There may be ways to avoid it, perhaps by segregating the actual data constructs being read/written, but you're going to have to lock SOMEWHERE.
Also, understand that Parallel.For() will use the thread pool. While simple to implement, you lose fine-grained control over creation/destruction of threads, and the thread pool involves some serious overhead when starting up a big parallel task.
From a conceptual standpoint, I would try two things in tandem to speed up this algorithm: