使用 ConcurrentQueue 时,尝试在并行循环时出队

发布于 2024-09-05 03:39:18 字数 448 浏览 7 评论 0原文

我在 .NET 4 应用程序中使用并行数据结构,并且在处理时添加了一个 ConcurrentQueue。

我想做类似的事情:

personqueue.AsParallel().WithDegreeOfParallelism(20).ForAll(i => ... );

当我进行数据库调用来保存数据时,所以我限制并发线程数。

但是,我预计 ForAll 不会出列,而且我担心只是

ForAll(i => {
    personqueue.personqueue.TryDequeue(...);
    ...
});

这样做,因为不能保证我会弹出正确的队列。

那么,我如何以并行方式迭代集合和出列。

或者,使用 PLINQ 并行执行此处理会更好吗?

I am using the parallel data structures in my .NET 4 application and I have a ConcurrentQueue that gets added to while I am processing through it.

I want to do something like:

personqueue.AsParallel().WithDegreeOfParallelism(20).ForAll(i => ... );

as I make database calls to save the data, so I am limiting the number of concurrent threads.

But, I expect that the ForAll isn't going to dequeue, and I am concerned about just doing

ForAll(i => {
    personqueue.personqueue.TryDequeue(...);
    ...
});

as there is no guarantee that I am popping off the correct one.

So, how can I iterate through the collection and dequeue, in a parallel fashion.

Or, would it be better to use PLINQ to do this processing, in parallel?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

嗳卜坏 2024-09-12 03:39:18

好吧,我不是 100% 确定你想在这里存档什么。您是否试图将所有项目出队直到什么都没有剩下?或者只是一次性将大量项目出队?

第一个可能意外的行为始于以下语句:

 theQueue.AsParallel()

对于 ConcurrentQueue,您将获得一个“快照”枚举器。因此,当您迭代并发堆栈时,您仅迭代快照,而不迭代“实时”队列。

一般来说,我认为迭代您在迭代过程中更改的内容并不是一个好主意。

因此,另一个解决方案如下所示:

        // this way it's more clear, that we only deque for theQueue.Count items
        // However after this, the queue is probably not empty
        // or maybe the queue is also empty earlier   
        Parallel.For(0, theQueue.Count,
                     new ParallelOptions() {MaxDegreeOfParallelism = 20},
                     () => { 
                         theQueue.TryDequeue(); //and stuff
                     });

这可以避免在迭代某些内容时对其进行操作。但是,在该语句之后,队列仍然可以包含在 for 循环期间添加的数据。

为了让队列暂时清空,您可能需要做更多的工作。这是一个非常丑陋的解决方案。当队列仍有项目时,创建新任务。每个任务开始时都会尽可能长时间地从队列中出队。最后,我们等待所有任务结束。为了限制并行性,我们永远不会创建超过 20 个任务。

        // Probably a kitty died because of this ugly code ;)
        // However, this code tries to get the queue empty in a very aggressive way
        Action consumeFromQueue = () =>
                                      {
                                          while (tt.TryDequeue())
                                          {
                                              ; // do your stuff
                                          }
                                      };
        var allRunningTasks = new Task[MaxParallism];
        for(int i=0;i<MaxParallism && tt.Count>0;i++)
        {
            allRunningTasks[i] = Task.Factory.StartNew(consumeFromQueue);  
        }
        Task.WaitAll(allRunningTasks);

Well I'm not 100% sure what you try to archive here. Are you trying to just dequeue all items until nothing is left? Or just dequeue lots of items in one go?

The first probably unexpected behavior starts with this statement:

 theQueue.AsParallel()

For a ConcurrentQueue, you get a 'Snapshot'-Enumerator. So when you iterate over a concurrent stack, you only iterate over the snapshot, no the 'live' queue.

In general I think it's not a good idea to iterate over something you're changing during the iteration.

So another solution would look like this:

        // this way it's more clear, that we only deque for theQueue.Count items
        // However after this, the queue is probably not empty
        // or maybe the queue is also empty earlier   
        Parallel.For(0, theQueue.Count,
                     new ParallelOptions() {MaxDegreeOfParallelism = 20},
                     () => { 
                         theQueue.TryDequeue(); //and stuff
                     });

This avoids manipulation something while iterating over it. However, after that statement, the queue can still contain data, which was added during the for-loop.

To get the queue empty for moment in time you probably need a little more work. Here's an really ugly solution. While the queue has still items, create new tasks. Each task start do dequeue from the queue as long as it can. At the end, we wait for all tasks to end. To limit the parallelism, we never create more than 20-tasks.

        // Probably a kitty died because of this ugly code ;)
        // However, this code tries to get the queue empty in a very aggressive way
        Action consumeFromQueue = () =>
                                      {
                                          while (tt.TryDequeue())
                                          {
                                              ; // do your stuff
                                          }
                                      };
        var allRunningTasks = new Task[MaxParallism];
        for(int i=0;i<MaxParallism && tt.Count>0;i++)
        {
            allRunningTasks[i] = Task.Factory.StartNew(consumeFromQueue);  
        }
        Task.WaitAll(allRunningTasks);
安稳善良 2024-09-12 03:39:18

如果您的目标是高吞吐量的真实站点,并且不需要立即进行数据库更新,那么您最好选择非常保守的解决方案,而不是额外的层库。

制作固定大小的数组(估计大小 - 比如说 1000 个项目或 N 秒的请求)和互锁索引,以便请求将数据放入槽中并返回。当一个块被填满时(继续检查计数),创建另一个块并生成异步委托来处理刚刚被填满的块并将其发送到 SQL。根据数据的结构,委托可以将所有数据打包到以逗号分隔的数组中,甚至可能是一个简单的 XML(当然要测试该数据的性能)并将它们发送到 SQL 存储过程,这应该能够最好地处理它们记录根据记录 - 从未持有过大锁。如果它变得很重,你可以将你的块分成几个更小的块。关键是您最大限度地减少了对 SQL 的请求数量,始终保持一定程度的分离,甚至不必为线程池付出代价 - 您可能根本不需要使用超过 2 个异步线程。

这将比摆弄 Parallel-s 快得多。

If you are aiming at a high throughout real site and you don't have to do immediate DB updates , you'll be much better of going for very conservative solution rather than extra layers libraries.

Make fixed size array (guestimate size - say 1000 items or N seconds worth of requests) and interlocked index so that requests just put data into slots and return. When one block gets filled (keep checking the count), make another one and spawn async delegate to process and send to SQL the block that just got filled. Depending on the structure of your data that delegate can pack all data into comma-separated arrays, maybe even a simple XML (got to test perf of that one of course) and send them to SQL sproc which should give it's best to process them record by record - never holding a big lock. It if gets heavy, you can split your block into several smaller blocks. The key thing is that you minimized the number of requests to SQL, always kept one degree of separation and didn't even have to pay the price for a thread pool - you probably won't need to use more that 2 async threads at all.

That's going to be a lot faster that fiddling with Parallel-s.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文