线程安全的数据缓冲区,用于批量插入受控大小

发布于 2024-11-27 20:38:34 字数 2778 浏览 0 评论 0原文

我有一个模拟生成必须保存到数据库的数据。

ParallelLoopResult res = Parallel.For(0, 1000000, options, (r, state) =>
{
    ComplexDataSet cds = GenerateData(r);

    SaveDataToDatabase(cds);

});

模拟会生成大量数据,因此先生成数据然后将其保存到数据库(最多 1 GB 数据)是不切实际的,而且逐一保存到数据库也是没有意义的(交易量太小,不切实际)。我想将它们作为受控大小的批量插入插入到数据库中(例如一次提交 100 个)。

然而,我认为我对并行计算的了解还不够理论化。我想出了这个(如您所见,这是非常有缺陷的):

DataBuffer buffer = new DataBuffer(...);

ParallelLoopResult res = Parallel.For(0, 10000000, options, (r, state) =>
{
    ComplexDataSet cds = GenerateData(r);

    buffer.SaveDataToBuffer(cds, i == r - 1);

});

public class DataBuffer
{
    int count = 0;
    int limit = 100

    object _locker = new object();

    ConcurrentQueue<ConcurrentBag<ComplexDataSet>> ComplexDataBagQueue{ get; set; }

    public void SaveDataToBuffer(ComplexDataSet data, bool isfinalcycle)
    {
            lock (_locker)
            {
                if(count >= limit)
                {
                    ConcurrentBag<ComplexDataSet> dequeueRef;
                    if(ComplexDataBagQueue.TryDequeue(out dequeueRef))
                    {
                        Commit(dequeueRef);
                    }

                    _lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
                    ComplexDataSetsQueue.Enqueue(_lastItemRef);
                    count = 1;
                }
                else
                {
                    // First time
                    if(_lastItemRef == null)
                    {
                        _lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
                        ComplexDataSetsQueue.Enqueue(_lastItemRef);
                        count = 1;
                    }
                    // If buffer isn't full
                    else
                    {
                        _lastItemRef.Add(data);
                        count++;
                    }
                }

                if(isfinalcycle)
                {
                        // Commit everything that hasn't been committed yet
                        ConcurrentBag<ComplexDataSet> dequeueRef;    
                    while (ComplexDataSetsQueue.TryDequeue(out dequeueRef))
                    {
                        Commit(dequeueRef);
                    }
                }
            }
    }

    public void Commit(ConcurrentBag<ComplexDataSet> data)
    {
        // Commit data to database..should this be somehow in another thread or something ?
    }
}

如您所见,我使用队列创建一个缓冲区,然后手动决定何时提交。然而,我有一种强烈的感觉,这对于我的问题来说并不是一个非常有效的解决方案。首先,我不确定我的锁定是否正确。其次,我不确定这是否是完全线程安全的(或根本不安全)。

您能看一下并评论一下我应该做些什么不同的事情吗?或者是否有更好的方法来做到这一点(使用某种生产者-消费者技术或其他技术)?

谢谢并致以最美好的祝愿, D .

I have a simulation that generates data which must be saved to database.

ParallelLoopResult res = Parallel.For(0, 1000000, options, (r, state) =>
{
    ComplexDataSet cds = GenerateData(r);

    SaveDataToDatabase(cds);

});

The simulation generates a whole lot of data, so it wouldn't be practical to first generate it and then save it to database (up to 1 GB of data) and it also wouldn't make sense to save it to database one by one (too small transanctions to be practical). I want to insert them to database as a batch insert of controlled size (say 100 with one commit).

However, I think my knowledge of parallel computing is less that theoretical. I came up with this (which as you can see is very flawed):

DataBuffer buffer = new DataBuffer(...);

ParallelLoopResult res = Parallel.For(0, 10000000, options, (r, state) =>
{
    ComplexDataSet cds = GenerateData(r);

    buffer.SaveDataToBuffer(cds, i == r - 1);

});

public class DataBuffer
{
    int count = 0;
    int limit = 100

    object _locker = new object();

    ConcurrentQueue<ConcurrentBag<ComplexDataSet>> ComplexDataBagQueue{ get; set; }

    public void SaveDataToBuffer(ComplexDataSet data, bool isfinalcycle)
    {
            lock (_locker)
            {
                if(count >= limit)
                {
                    ConcurrentBag<ComplexDataSet> dequeueRef;
                    if(ComplexDataBagQueue.TryDequeue(out dequeueRef))
                    {
                        Commit(dequeueRef);
                    }

                    _lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
                    ComplexDataSetsQueue.Enqueue(_lastItemRef);
                    count = 1;
                }
                else
                {
                    // First time
                    if(_lastItemRef == null)
                    {
                        _lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
                        ComplexDataSetsQueue.Enqueue(_lastItemRef);
                        count = 1;
                    }
                    // If buffer isn't full
                    else
                    {
                        _lastItemRef.Add(data);
                        count++;
                    }
                }

                if(isfinalcycle)
                {
                        // Commit everything that hasn't been committed yet
                        ConcurrentBag<ComplexDataSet> dequeueRef;    
                    while (ComplexDataSetsQueue.TryDequeue(out dequeueRef))
                    {
                        Commit(dequeueRef);
                    }
                }
            }
    }

    public void Commit(ConcurrentBag<ComplexDataSet> data)
    {
        // Commit data to database..should this be somehow in another thread or something ?
    }
}

As you can see, I'm using queue to create a buffer and then manually decide when to commit. However I have a strong feeling that this isn't very performing solution to my problem. First, I'm unsure whether I'm doing locking right. Second, I'm not sure even if this is fully thread-safe (or at all).

Can you please take a look for a moment and comment what should I do differently ? Or if there is a complitely better way of doing this (using somekind of Producer-Consumer technique or something) ?

Thanks and best wishes,
D.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

苍白女子 2024-12-04 20:38:34

无需使用锁或昂贵的并发安全数据结构。数据都是独立的,因此引入锁定和共享只会损害性能和可扩展性。

Parallel.For 有一个重载,可让您指定每个线程的数据。您可以在其中存储专用队列和专用数据库连接。

另外:Parallel.For 在内部将您的范围划分为更小的块。传递一个很大的范围是非常有效的,所以没有什么可以改变的。

Parallel.For(0, 10000000, () => new ThreadState(),
    (i, loopstate, threadstate) =>
{
    ComplexDataSet data = GenerateData(i);

    threadstate.Add(data);

    return threadstate;
}, threadstate => threadstate.Dispose());

sealed class ThreadState : IDisposable
{
    readonly IDisposable db;
    readonly Queue<ComplexDataSet> queue = new Queue<ComplexDataSet>();

    public ThreadState()
    {
        // initialize db with a private MongoDb connection.
    }

    public void Add(ComplexDataSet cds)
    {
        queue.Enqueue(cds);

        if(queue.Count == 100)
        {
            Commit();
        }
    }

    void Commit()
    {
        db.Write(queue);
        queue.Clear();
    }

    public void Dispose()
    {
        try
        {
            if(queue.Count > 0)
            {
                Commit();
            }
        }
        finally
        {
            db.Dispose();
        }
    }
}

现在,MongoDb 目前不支持真正的并发插入——它在服务器中持有一些昂贵的锁,因此并行提交不会为您带来太多(如果有的话)速度。他们希望将来解决这个问题,所以有一天你可能会得到免费的加速。

如果您需要限制持有的数据库连接数量,生产者/消费者设置是一个不错的选择。您可以使用 BlockingCollection 队列来高效地完成此操作,而无需使用任何锁:

// Specify a maximum of 1000 items in the collection so that we don't
// run out of memory if we get data faster than we can commit it.
// Add() will wait if it is full.

BlockingCollection<ComplexDataSet> commits =
    new BlockingCollection<ComplexDataSet>(1000);

Task consumer = Task.Factory.StartNew(() =>
    {
        // This is the consumer.  It processes the
        // "commits" queue until it signals completion.

        while(!commits.IsCompleted)
        {
            ComplexDataSet cds;

            // Timeout of -1 will wait for an item or IsCompleted == true.

            if(commits.TryTake(out cds, -1))
            {
                // Got at least one item, write it.
                db.Write(cds);

                // Continue dequeuing until the queue is empty, where it will
                // timeout instantly and return false, or until we've dequeued
                // 100 items.

                for(int i = 1; i < 100 && commits.TryTake(out cds, 0); ++i)
                {
                    db.Write(cds);
                }

                // Now that we're waiting for more items or have dequeued 100
                // of them, commit.  More can be continue to be added to the
                // queue by other threads while this commit is processing.

                db.Commit();
            }
        }
    }, TaskCreationOptions.LongRunning);

try
{
    // This is the producer.

    Parallel.For(0, 1000000, i =>
        {
            ComplexDataSet data = GenerateData(i);
            commits.Add(data);
        });
}
finally // put in a finally to ensure the task closes down.
{
    commits.CompleteAdding(); // set commits.IsFinished = true.
    consumer.Wait(); // wait for task to finish committing all the items.
}

There is no need to use locks or expensive concurrency-safe data structures. The data is all independent, so introducing locking and sharing will only hurt performance and scalability.

Parallel.For has an overload that lets you specify per-thread data. In this you can store a private queue and private database connection.

Also: Parallel.For internally partitions your range into smaller chunks. It's perfectly efficient to pass it a huge range, so nothing to change there.

Parallel.For(0, 10000000, () => new ThreadState(),
    (i, loopstate, threadstate) =>
{
    ComplexDataSet data = GenerateData(i);

    threadstate.Add(data);

    return threadstate;
}, threadstate => threadstate.Dispose());

sealed class ThreadState : IDisposable
{
    readonly IDisposable db;
    readonly Queue<ComplexDataSet> queue = new Queue<ComplexDataSet>();

    public ThreadState()
    {
        // initialize db with a private MongoDb connection.
    }

    public void Add(ComplexDataSet cds)
    {
        queue.Enqueue(cds);

        if(queue.Count == 100)
        {
            Commit();
        }
    }

    void Commit()
    {
        db.Write(queue);
        queue.Clear();
    }

    public void Dispose()
    {
        try
        {
            if(queue.Count > 0)
            {
                Commit();
            }
        }
        finally
        {
            db.Dispose();
        }
    }
}

Now, MongoDb currently doesn't support truly concurrent inserts -- it holds some expensive locks in the server, so parallel commits won't gain you much (if any) speed. They want to fix this in the future, so you might get a free speed-up one day.

If you need to limit the number of database connections held, a producer/consumer setup is a good alternative. You can use a BlockingCollection queue to do this efficiently without using any locks:

// Specify a maximum of 1000 items in the collection so that we don't
// run out of memory if we get data faster than we can commit it.
// Add() will wait if it is full.

BlockingCollection<ComplexDataSet> commits =
    new BlockingCollection<ComplexDataSet>(1000);

Task consumer = Task.Factory.StartNew(() =>
    {
        // This is the consumer.  It processes the
        // "commits" queue until it signals completion.

        while(!commits.IsCompleted)
        {
            ComplexDataSet cds;

            // Timeout of -1 will wait for an item or IsCompleted == true.

            if(commits.TryTake(out cds, -1))
            {
                // Got at least one item, write it.
                db.Write(cds);

                // Continue dequeuing until the queue is empty, where it will
                // timeout instantly and return false, or until we've dequeued
                // 100 items.

                for(int i = 1; i < 100 && commits.TryTake(out cds, 0); ++i)
                {
                    db.Write(cds);
                }

                // Now that we're waiting for more items or have dequeued 100
                // of them, commit.  More can be continue to be added to the
                // queue by other threads while this commit is processing.

                db.Commit();
            }
        }
    }, TaskCreationOptions.LongRunning);

try
{
    // This is the producer.

    Parallel.For(0, 1000000, i =>
        {
            ComplexDataSet data = GenerateData(i);
            commits.Add(data);
        });
}
finally // put in a finally to ensure the task closes down.
{
    commits.CompleteAdding(); // set commits.IsFinished = true.
    consumer.Wait(); // wait for task to finish committing all the items.
}
南风起 2024-12-04 20:38:34

在您的示例中,您有 10 000 000 个工作包。其中每一个都需要分发到一个线程。
假设您没有真正大量的 CPU 核心,这不是最佳选择。您还必须在每个 buffer.SaveDataToBuffer 调用上同步线程(通过使用锁)。此外,您应该注意,在时间顺序视图中,变量 r 不一定会加一(例如:Thread1 使用 1,2,3 执行 r,Thread2 使用 4,5,6 执行 r。按时间顺序排列此将导致以下 r 序列传递到 SaveDataToBuffer 1,4,2,5,3,6(大约))。

我会将工作包变得更大,然后一次提交每个包。这还有一个好处是您不必经常锁定/同步所有内容。

下面是一个例子:

int total = 10000000;
int step = 1000;

Parallel.For(0, total / step, (r, state) =>
{
    int start = r * start;
    int end = start + step;

    ComplexDataSet[] result = new ComplexDataSet[step];

    for (int i = start; i < end; i++)
    {
        result[i - start] = GenerateData(i);
    }

    Commit(result);
});

在这个例子中,整个工作被分成 10000 个包(并行执行),每个包生成 1000 个数据项并将它们提交到数据库。

对于此解决方案,如果设计不明智,Commit 方法可能会成为瓶颈。最好的办法是在不使用任何锁的情况下使其线程安全。如果您不在需要同步的线程之间使用公共对象,则可以完成此操作。

例如,对于 sql 服务器后端,这意味着在每个 Commit() 调用的上下文中创建自己的 sql 连接:

private void Commit(ComplexDataSet[] data)
{
    using (var connection = new SqlConnection("connection string..."))
    {
        connection.Open();

        // insert your data here...
    }
}

In your example you have 10 000 000 packages of work. Each of this needs to be distributed to a thread.
Assuming you don't have a really large number of cpu cores this is not optimal. You also have to synchronize your threads on every buffer.SaveDataToBuffer call (by using locks). Additionally you should be aware that the variable r isn't necessarly increased by one in a chronology view (example: Thread1 executes r with 1,2,3 and Thread2 with 4,5,6. Chronological this would lead to the following sequence of r passed to SaveDataToBuffer 1,4,2,5,3,6 (approximately)).

I would make the packages of work larger and then commit each package at once. This has also the benefit that you don't have to lock/synchronize all to often.

Here's an example:

int total = 10000000;
int step = 1000;

Parallel.For(0, total / step, (r, state) =>
{
    int start = r * start;
    int end = start + step;

    ComplexDataSet[] result = new ComplexDataSet[step];

    for (int i = start; i < end; i++)
    {
        result[i - start] = GenerateData(i);
    }

    Commit(result);
});

In this example the whole work is split into 10 000 packages (which are executed in parallel) and every package generates 1000 data items and commits them to the database.

With this solution the Commit method might be a bottleneck, if not wisely designed. Best would be to make it thread safe without using any locks. This can be accomplished, if you don't use common objects between threads which need synchronization.

E.g. for a sql server backend that would mean creating an own sql connection in the context of every Commit() call:

private void Commit(ComplexDataSet[] data)
{
    using (var connection = new SqlConnection("connection string..."))
    {
        connection.Open();

        // insert your data here...
    }
}
白首有我共你 2024-12-04 20:38:34

与其增加软件的复杂性,不如考虑简化。您可以将代码重构为三个部分:

  1. 入队的工作人员

    这是Parallel.For中的并发GenerateData,它会进行一些繁重的计算并生成ComplexDataSet

  2. 实际队列

    一个并发队列,用于存储 [1] 的结果 - 这么多ComplexDataSet。在这里,我假设一个ComplexDataSet实例实际上并不真正消耗资源并且相当轻。只要队列是并发的,它就支持并行“插入”和“删除”。

  3. 出队的工人

    从处理队列 [2] 获取一个ComplexDataSet实例并将其放入并发包(或其他存储)中的代码。一旦包中有 N 个您阻止的项目,请停止出队,将包的内容刷新到数据库中并将其清除。最后,解除阻塞并恢复出队。

这是一些元代码(它仍然可以编译,但需要改进)

[1]

// [1] - Class is responsible for generating complex data sets and 
// adding them to processing queue
class EnqueueWorker
{
    //generate data and add to queue
    internal void ParrallelEnqueue(ConcurrentQueue<ComplexDataSet> resultQueue)
    {
        Parallel.For(1, 10000, (i) =>
        {
            ComplexDataSet cds = GenerateData(i);
            resultQueue.Enqueue(cds);

        });
    }

    //generate data
    ComplexDataSet GenerateData(int i)
    {
        return new ComplexDataSet();
    }
}

[3]

//[3] This guy takes sets from the processing queue and flush results when 
// N items have been generated
class DequeueWorker
{
    //buffer that holds processed dequeued data
    private static ConcurrentBag<ComplexDataSet> buffer;

    //lock to flush the data to the db once in a while
    private static object syncRoot = new object();

    //take item from processing queue and add it to internal buffer storage
    //once buffer is full - flush it to the database
    internal void ParrallelDequeue(ConcurrentQueue<ComplexDataSet> resultQueue)
    {
        buffer = new ConcurrentBag<ComplexDataSet>();
        int N = 100;

        Parallel.For(1, 10000, (i) =>
        {
            //try dequeue
            ComplexDataSet cds = null;

            var spinWait = new SpinWait();

            while (cds == null)
            {
                resultQueue.TryDequeue(out cds);
                spinWait.SpinOnce();
            }

            //add to buffer
            buffer.Add(cds);

            //flush to database if needed
            if (buffer.Count == N)
            {
                lock (syncRoot)
                {
                    IEnumerable<ComplexDataSet> data = buffer.ToArray();

                    // flush data to database

                    buffer = new ConcurrentBag<ComplexDataSet>();
                }
            }

        });
    }        
}

[2] 和用法

class ComplexDataSet { }

class Program
{
    //processing queueu - [2]
    private static ConcurrentQueue<ComplexDataSet> processingQueue;

    static void Main(string[] args)
    {
        // create new processing queue - single instance for whole app
        processingQueue = new ConcurrentQueue<ComplexDataSet>();

        //enqueue worker
        Task enqueueTask = Task.Factory.StartNew(() =>
            {
                EnqueueWorker enqueueWorker = new EnqueueWorker();
                enqueueWorker.ParrallelEnqueue(processingQueue);
            });

        //dequeue worker
        Task dequeueTask = Task.Factory.StartNew(() =>
        {
            DequeueWorker dequeueWorker = new DequeueWorker();
            dequeueWorker.ParrallelDequeue(processingQueue);
        });            
    }
}

Instead of increasing complexity of software, rather consider simplification. You can refactor the code into three parts:

  1. Workers that enqueue

    This is concurrent GenerateData in Parallel.For that does some heavy computation and produce ComplexDataSet.

  2. Actual queue

    A concurrent queue that stores the results from [1] - so many ComplexDataSet. Here I assumed that one instance of ComplexDataSet is actually not really resource consuming and fairly light. As long as the queue is concurrent it will support parallel "inserts" and "deletes".

  3. Workers that dequeue

    Code that takes one instance of the ComplexDataSet from processing queue [2] and puts it into the concurrent bag (or other storage). Once the bag has N number of items you block, stop dequeueing, flush the content of the bag into the database and clear it. Finally, you unblock and resume dequeueing.

Here is some metacode (it still compiles, but needs improvements)

[1]

// [1] - Class is responsible for generating complex data sets and 
// adding them to processing queue
class EnqueueWorker
{
    //generate data and add to queue
    internal void ParrallelEnqueue(ConcurrentQueue<ComplexDataSet> resultQueue)
    {
        Parallel.For(1, 10000, (i) =>
        {
            ComplexDataSet cds = GenerateData(i);
            resultQueue.Enqueue(cds);

        });
    }

    //generate data
    ComplexDataSet GenerateData(int i)
    {
        return new ComplexDataSet();
    }
}

[3]

//[3] This guy takes sets from the processing queue and flush results when 
// N items have been generated
class DequeueWorker
{
    //buffer that holds processed dequeued data
    private static ConcurrentBag<ComplexDataSet> buffer;

    //lock to flush the data to the db once in a while
    private static object syncRoot = new object();

    //take item from processing queue and add it to internal buffer storage
    //once buffer is full - flush it to the database
    internal void ParrallelDequeue(ConcurrentQueue<ComplexDataSet> resultQueue)
    {
        buffer = new ConcurrentBag<ComplexDataSet>();
        int N = 100;

        Parallel.For(1, 10000, (i) =>
        {
            //try dequeue
            ComplexDataSet cds = null;

            var spinWait = new SpinWait();

            while (cds == null)
            {
                resultQueue.TryDequeue(out cds);
                spinWait.SpinOnce();
            }

            //add to buffer
            buffer.Add(cds);

            //flush to database if needed
            if (buffer.Count == N)
            {
                lock (syncRoot)
                {
                    IEnumerable<ComplexDataSet> data = buffer.ToArray();

                    // flush data to database

                    buffer = new ConcurrentBag<ComplexDataSet>();
                }
            }

        });
    }        
}

[2] and usage

class ComplexDataSet { }

class Program
{
    //processing queueu - [2]
    private static ConcurrentQueue<ComplexDataSet> processingQueue;

    static void Main(string[] args)
    {
        // create new processing queue - single instance for whole app
        processingQueue = new ConcurrentQueue<ComplexDataSet>();

        //enqueue worker
        Task enqueueTask = Task.Factory.StartNew(() =>
            {
                EnqueueWorker enqueueWorker = new EnqueueWorker();
                enqueueWorker.ParrallelEnqueue(processingQueue);
            });

        //dequeue worker
        Task dequeueTask = Task.Factory.StartNew(() =>
        {
            DequeueWorker dequeueWorker = new DequeueWorker();
            dequeueWorker.ParrallelDequeue(processingQueue);
        });            
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文