线程安全的数据缓冲区,用于批量插入受控大小
我有一个模拟生成必须保存到数据库的数据。
ParallelLoopResult res = Parallel.For(0, 1000000, options, (r, state) =>
{
ComplexDataSet cds = GenerateData(r);
SaveDataToDatabase(cds);
});
模拟会生成大量数据,因此先生成数据然后将其保存到数据库(最多 1 GB 数据)是不切实际的,而且逐一保存到数据库也是没有意义的(交易量太小,不切实际)。我想将它们作为受控大小的批量插入插入到数据库中(例如一次提交 100 个)。
然而,我认为我对并行计算的了解还不够理论化。我想出了这个(如您所见,这是非常有缺陷的):
DataBuffer buffer = new DataBuffer(...);
ParallelLoopResult res = Parallel.For(0, 10000000, options, (r, state) =>
{
ComplexDataSet cds = GenerateData(r);
buffer.SaveDataToBuffer(cds, i == r - 1);
});
public class DataBuffer
{
int count = 0;
int limit = 100
object _locker = new object();
ConcurrentQueue<ConcurrentBag<ComplexDataSet>> ComplexDataBagQueue{ get; set; }
public void SaveDataToBuffer(ComplexDataSet data, bool isfinalcycle)
{
lock (_locker)
{
if(count >= limit)
{
ConcurrentBag<ComplexDataSet> dequeueRef;
if(ComplexDataBagQueue.TryDequeue(out dequeueRef))
{
Commit(dequeueRef);
}
_lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
ComplexDataSetsQueue.Enqueue(_lastItemRef);
count = 1;
}
else
{
// First time
if(_lastItemRef == null)
{
_lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
ComplexDataSetsQueue.Enqueue(_lastItemRef);
count = 1;
}
// If buffer isn't full
else
{
_lastItemRef.Add(data);
count++;
}
}
if(isfinalcycle)
{
// Commit everything that hasn't been committed yet
ConcurrentBag<ComplexDataSet> dequeueRef;
while (ComplexDataSetsQueue.TryDequeue(out dequeueRef))
{
Commit(dequeueRef);
}
}
}
}
public void Commit(ConcurrentBag<ComplexDataSet> data)
{
// Commit data to database..should this be somehow in another thread or something ?
}
}
如您所见,我使用队列创建一个缓冲区,然后手动决定何时提交。然而,我有一种强烈的感觉,这对于我的问题来说并不是一个非常有效的解决方案。首先,我不确定我的锁定是否正确。其次,我不确定这是否是完全线程安全的(或根本不安全)。
您能看一下并评论一下我应该做些什么不同的事情吗?或者是否有更好的方法来做到这一点(使用某种生产者-消费者技术或其他技术)?
谢谢并致以最美好的祝愿, D .
I have a simulation that generates data which must be saved to database.
ParallelLoopResult res = Parallel.For(0, 1000000, options, (r, state) =>
{
ComplexDataSet cds = GenerateData(r);
SaveDataToDatabase(cds);
});
The simulation generates a whole lot of data, so it wouldn't be practical to first generate it and then save it to database (up to 1 GB of data) and it also wouldn't make sense to save it to database one by one (too small transanctions to be practical). I want to insert them to database as a batch insert of controlled size (say 100 with one commit).
However, I think my knowledge of parallel computing is less that theoretical. I came up with this (which as you can see is very flawed):
DataBuffer buffer = new DataBuffer(...);
ParallelLoopResult res = Parallel.For(0, 10000000, options, (r, state) =>
{
ComplexDataSet cds = GenerateData(r);
buffer.SaveDataToBuffer(cds, i == r - 1);
});
public class DataBuffer
{
int count = 0;
int limit = 100
object _locker = new object();
ConcurrentQueue<ConcurrentBag<ComplexDataSet>> ComplexDataBagQueue{ get; set; }
public void SaveDataToBuffer(ComplexDataSet data, bool isfinalcycle)
{
lock (_locker)
{
if(count >= limit)
{
ConcurrentBag<ComplexDataSet> dequeueRef;
if(ComplexDataBagQueue.TryDequeue(out dequeueRef))
{
Commit(dequeueRef);
}
_lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
ComplexDataSetsQueue.Enqueue(_lastItemRef);
count = 1;
}
else
{
// First time
if(_lastItemRef == null)
{
_lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
ComplexDataSetsQueue.Enqueue(_lastItemRef);
count = 1;
}
// If buffer isn't full
else
{
_lastItemRef.Add(data);
count++;
}
}
if(isfinalcycle)
{
// Commit everything that hasn't been committed yet
ConcurrentBag<ComplexDataSet> dequeueRef;
while (ComplexDataSetsQueue.TryDequeue(out dequeueRef))
{
Commit(dequeueRef);
}
}
}
}
public void Commit(ConcurrentBag<ComplexDataSet> data)
{
// Commit data to database..should this be somehow in another thread or something ?
}
}
As you can see, I'm using queue to create a buffer and then manually decide when to commit. However I have a strong feeling that this isn't very performing solution to my problem. First, I'm unsure whether I'm doing locking right. Second, I'm not sure even if this is fully thread-safe (or at all).
Can you please take a look for a moment and comment what should I do differently ? Or if there is a complitely better way of doing this (using somekind of Producer-Consumer technique or something) ?
Thanks and best wishes,
D.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
无需使用锁或昂贵的并发安全数据结构。数据都是独立的,因此引入锁定和共享只会损害性能和可扩展性。
Parallel.For
有一个重载,可让您指定每个线程的数据。您可以在其中存储专用队列和专用数据库连接。另外:Parallel.For 在内部将您的范围划分为更小的块。传递一个很大的范围是非常有效的,所以没有什么可以改变的。
现在,MongoDb 目前不支持真正的并发插入——它在服务器中持有一些昂贵的锁,因此并行提交不会为您带来太多(如果有的话)速度。他们希望将来解决这个问题,所以有一天你可能会得到免费的加速。
如果您需要限制持有的数据库连接数量,生产者/消费者设置是一个不错的选择。您可以使用
BlockingCollection
队列来高效地完成此操作,而无需使用任何锁:There is no need to use locks or expensive concurrency-safe data structures. The data is all independent, so introducing locking and sharing will only hurt performance and scalability.
Parallel.For
has an overload that lets you specify per-thread data. In this you can store a private queue and private database connection.Also:
Parallel.For
internally partitions your range into smaller chunks. It's perfectly efficient to pass it a huge range, so nothing to change there.Now, MongoDb currently doesn't support truly concurrent inserts -- it holds some expensive locks in the server, so parallel commits won't gain you much (if any) speed. They want to fix this in the future, so you might get a free speed-up one day.
If you need to limit the number of database connections held, a producer/consumer setup is a good alternative. You can use a
BlockingCollection
queue to do this efficiently without using any locks:在您的示例中,您有 10 000 000 个工作包。其中每一个都需要分发到一个线程。
假设您没有真正大量的 CPU 核心,这不是最佳选择。您还必须在每个 buffer.SaveDataToBuffer 调用上同步线程(通过使用锁)。此外,您应该注意,在时间顺序视图中,变量
r
不一定会加一(例如:Thread1 使用 1,2,3 执行 r,Thread2 使用 4,5,6 执行 r。按时间顺序排列此将导致以下r
序列传递到SaveDataToBuffer
1,4,2,5,3,6(大约))。我会将工作包变得更大,然后一次提交每个包。这还有一个好处是您不必经常锁定/同步所有内容。
下面是一个例子:
在这个例子中,整个工作被分成 10000 个包(并行执行),每个包生成 1000 个数据项并将它们提交到数据库。
对于此解决方案,如果设计不明智,
Commit
方法可能会成为瓶颈。最好的办法是在不使用任何锁的情况下使其线程安全。如果您不在需要同步的线程之间使用公共对象,则可以完成此操作。例如,对于 sql 服务器后端,这意味着在每个
Commit()
调用的上下文中创建自己的 sql 连接:In your example you have 10 000 000 packages of work. Each of this needs to be distributed to a thread.
Assuming you don't have a really large number of cpu cores this is not optimal. You also have to synchronize your threads on every
buffer.SaveDataToBuffer
call (by using locks). Additionally you should be aware that the variabler
isn't necessarly increased by one in a chronology view (example: Thread1 executes r with 1,2,3 and Thread2 with 4,5,6. Chronological this would lead to the following sequence ofr
passed toSaveDataToBuffer
1,4,2,5,3,6 (approximately)).I would make the packages of work larger and then commit each package at once. This has also the benefit that you don't have to lock/synchronize all to often.
Here's an example:
In this example the whole work is split into 10 000 packages (which are executed in parallel) and every package generates 1000 data items and commits them to the database.
With this solution the
Commit
method might be a bottleneck, if not wisely designed. Best would be to make it thread safe without using any locks. This can be accomplished, if you don't use common objects between threads which need synchronization.E.g. for a sql server backend that would mean creating an own sql connection in the context of every
Commit()
call:与其增加软件的复杂性,不如考虑简化。您可以将代码重构为三个部分:
入队的工作人员
这是Parallel.For中的并发GenerateData,它会进行一些繁重的计算并生成ComplexDataSet。
实际队列
一个并发队列,用于存储 [1] 的结果 - 这么多ComplexDataSet。在这里,我假设一个ComplexDataSet实例实际上并不真正消耗资源并且相当轻。只要队列是并发的,它就支持并行“插入”和“删除”。
出队的工人
从处理队列 [2] 获取一个ComplexDataSet实例并将其放入并发包(或其他存储)中的代码。一旦包中有 N 个您阻止的项目,请停止出队,将包的内容刷新到数据库中并将其清除。最后,解除阻塞并恢复出队。
这是一些元代码(它仍然可以编译,但需要改进)
[1]
[3]
[2] 和用法
Instead of increasing complexity of software, rather consider simplification. You can refactor the code into three parts:
Workers that enqueue
This is concurrent GenerateData in Parallel.For that does some heavy computation and produce ComplexDataSet.
Actual queue
A concurrent queue that stores the results from [1] - so many ComplexDataSet. Here I assumed that one instance of ComplexDataSet is actually not really resource consuming and fairly light. As long as the queue is concurrent it will support parallel "inserts" and "deletes".
Workers that dequeue
Code that takes one instance of the ComplexDataSet from processing queue [2] and puts it into the concurrent bag (or other storage). Once the bag has N number of items you block, stop dequeueing, flush the content of the bag into the database and clear it. Finally, you unblock and resume dequeueing.
Here is some metacode (it still compiles, but needs improvements)
[1]
[3]
[2] and usage