流数据BlockingCollection

发布于 2024-12-29 09:07:57 字数 898 浏览 2 评论 0原文

在 Stephen Toub 的书的第 88 页

http://www.microsoft.com/ download/en/details.aspx?id=19222

代码

private BlockingCollection<T> _streamingData = new BlockingCollection<T>();
// Parallel.ForEach
Parallel.ForEach(_streamingData.GetConsumingEnumerable(),
item => Process(item));
// PLINQ
var q = from item in _streamingData.GetConsumingEnumerable().AsParallel()
...
select item;

这是斯蒂芬随后提到的

“当 将调用 GetConsumingEnumerable 的结果作为数据源传递给 Parallel.ForEach,即所使用的线程 当集合变空时,循环有可能阻塞。并且被阻塞的线程可能不会被 Parallel.ForEach 释放回 ThreadPool 以供退休或其他用途。因此,代码如下所示 如上所述,如果有一段时间集合为空,则进程中的线程数可能会稳定 成长;”

我不明白为什么线程计数会增长?

如果集合为空,那么阻塞集合不会请求任何进一步的线程吗?

因此,您不需要执行 WithDegreeOfParallelism 来限制 BlockingCollection 上使用的线程数

On page 88 of Stephen Toub's book

http://www.microsoft.com/download/en/details.aspx?id=19222

There is the code

private BlockingCollection<T> _streamingData = new BlockingCollection<T>();
// Parallel.ForEach
Parallel.ForEach(_streamingData.GetConsumingEnumerable(),
item => Process(item));
// PLINQ
var q = from item in _streamingData.GetConsumingEnumerable().AsParallel()
...
select item;

Stephen then mentions

"when
passing the result of calling GetConsumingEnumerable as the data source to Parallel.ForEach, the threads used by
the loop have the potential to block when the collection becomes empty. And a blocked thread may not be released by Parallel.ForEach back to the ThreadPool for retirement or other uses. As such, with the code as shown
above, if there are any periods of time where the collection is empty, the thread count in the process may steadily
grow;"

I do not understand why the thread count would grow?

If the collection is empty then wouldn't the blockingcollection not request any further threads?

Hence you do not need to do WithDegreeOfParallelism to limit the number of threads used on the BlockingCollection

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

甜心小果奶 2025-01-05 09:07:57

线程池有一个爬山算法,用于估计适当的线程数。只要添加线程增加吞吐量,线程池就会创建更多线程。它会假设发生了一些阻塞或 IO,并尝试通过检查系统中的处理器数量来使 CPU 饱和。

这就是为什么在线程池线程上执行 IO 和阻塞操作可能很危险的原因。

这是上述行为的一个完整的工作示例:

        BlockingCollection<string> _streamingData = new BlockingCollection<string>();

        Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 100; i++)
                {
                    _streamingData.Add(i.ToString());
                    Thread.Sleep(100);
                }
            });

        new Thread(() =>
            {
                while (true)
                {
                    Thread.Sleep(1000);
                    Console.WriteLine("Thread count: " + Process.GetCurrentProcess().Threads.Count);
                }
            }).Start();

        Parallel.ForEach(_streamingData.GetConsumingEnumerable(), item =>
            {
            });

我不知道为什么线程数不断攀升,尽管它没有增加吞吐量。根据我解释的模型,它不会增长。但我不知道我的模型是否真的正确。

也许线程池有一个额外的启发式方法,如果它没有看到任何进展(以每秒完成的任务来衡量),它就会生成线程。这是有道理的,因为这可能会防止应用程序中出现大量死锁。如果重要任务因等待现有任务退出并使线程可用而无法运行,则可能会发生死锁。这是线程池的一个众所周知的问题。

The thread pool has a hill climbing algorithm that it uses to estimate the appropriate number of threads. As long as adding threads increases throughput, the thread pool will create more threads. It will assume that some blocking or IO happens and try to saturate the CPU by going over the count of processors in the system.

That is why doing IO and blocking stuff on thread pool threads can be dangerous.

Here is a fully working example of said behavior:

        BlockingCollection<string> _streamingData = new BlockingCollection<string>();

        Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 100; i++)
                {
                    _streamingData.Add(i.ToString());
                    Thread.Sleep(100);
                }
            });

        new Thread(() =>
            {
                while (true)
                {
                    Thread.Sleep(1000);
                    Console.WriteLine("Thread count: " + Process.GetCurrentProcess().Threads.Count);
                }
            }).Start();

        Parallel.ForEach(_streamingData.GetConsumingEnumerable(), item =>
            {
            });

I do not know why the thread count keeps climbing although it does not increase throughput. According to the model that I explained it would not grow. But I do not know if my model is actually correct.

Maybe the thread-pool has an additional heuristic that makes it spawn threads if it sees no progress at all (measured in tasks completed per second). That would make sense because that would likely prevent a lot of deadlocks in applications. Deadlocks can happen if important tasks cannot run because they are waiting for existing tasks to exit and make threads available. This is a well-known problem with the thread pool.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文