处理队列成员
我有一个线程填充队列。我有另一个线程来处理这个队列。我的问题是第一个线程填充队列的速度非常快,因此另一个线程无法更快地处理该队列,并且我的程序一直过度使用内存。这个问题的最佳解决方案是什么?
抱歉我忘了添加一些东西。我无法限制我的队列或生产者线程。我的生产者线程迫不及待,因为它正在捕获网络数据包,我不应该错过任何数据包。我必须比生产者线程更快地处理这些数据包。
I have a thread which fills a queue. And I have another thread which process this queue. My problem is first thread fills the queue very fast so the other thread couldn't process this queue that much faster and my program keeps overuse ram. What is the optimum solution for this problem?
Sorry I forgot to add something. I can't limit my queue or producer thread. My producer thread couldn't wait because it's capturing network packets and I shouldn't miss any packet. I have to process these packets fast than producer thread.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(5)
好吧,假设队列中项目的处理顺序并不重要,您可以运行两个(或更多)线程来处理队列。
除非它们之间存在某种争用,否则应该可以加快处理速度。这称为多消费者模型。
另一种可能性是让生产者线程监视队列的大小并拒绝添加条目,直到队列的大小低于某个阈值。标准 C# 队列不提供停止容量扩展的方法(即使使用 1.0 增长因子也不会抑制增长)。
Well, assuming that the order of processing of items in the queue is not important, you can run two (or more) threads processing the queue.
Unless there's some sort of contention between them, that should enable faster processing. This is known as a multi-consumer model.
Another possibility is to have your producer thread monitor the size of the queue and refuse to add entries until it drops below some threshold. Standard C# queues don't provide a way to stop expansion of the capacity (even using a 1.0 growth factor will not inhibit growth).
您可以定义最大队列大小(假设为 2000),当命中该大小时,队列将仅在队列大小降至较低大小(假设为 1000)时才接受更多项目。
我建议使用 EventWaitHandle 或 ManualResetEvent 以避免忙等待。 http://msdn.microsoft.com/en-us/library /system.threading.manualresetevent.aspx
You could define a maximum queue size (let's say 2000) which when hit causes the queue to only accept more items when it's down to a lower size (let's say 1000).
I'd recommend using an EventWaitHandle or a ManualResetEvent in order not to busy-wait. http://msdn.microsoft.com/en-us/library/system.threading.manualresetevent.aspx
除非您已经这样做,否则请使用
BlockingCollection
作为队列,并向构造函数的boundedCapacity
参数传递一些合理的限制(然后反映在BoundedCapacity 中)
属性) - 如果这会使队列太大,那么您的生产者将在Add
上阻塞,并在消费者从队列中删除某些元素后恢复。根据 MSDN 文档
BlockingCollection.Add
< /a>:如果在初始化
BlockingCollection
实例时指定了有限容量,则对 Add 的调用可能会阻塞,直到有空间可用于存储所提供的容量。 物品。Unless you are already doing so, use
BlockingCollection<T>
as your queue and pass some reasonable limit to theboundedCapacity
parameter of constructor (which is then reflected inBoundedCapacity
property) - your producer will block onAdd
if this would make the queue too large and resume after consumer has removed some element from the queue.According to MSDN documentation for
BlockingCollection<T>.Add
:If a bounded capacity was specified when this instance of
BlockingCollection<T>
was initialized, a call to Add may block until space is available to store the provided item.另一种方法是在启动时 new() X 线程间通信实例,将它们放入队列中并且不再创建。线程 A 将对象从池队列中弹出,用数据填充它们并将它们排队到线程 B。线程 B 获取对象,处理它们,然后将它们返回到池队列。
这提供了流量控制 - 如果线程 A 试图过快地发布,池将耗尽,并且 A 将必须在池队列上等待,直到 B 返回对象。它有可能提高性能,因为在初始池填充后没有 malloc 和 frees - 队列推送/弹出的锁定时间将少于内存管理器调用的锁定时间。不需要复杂的有界队列 - 任何旧的生产者-消费者队列类都可以。该池可用于整个具有许多线程/线程池的完整应用程序中的线程间通信,因此可以对它们进行流量控制。关闭问题可以得到缓解 - 如果池队列是由主线程在启动时在任何表单等之前创建的并且从未释放,通常可以避免应用程序关闭时显式后台线程关闭 - 这种痛苦最好忘记。通过监视池级别可以轻松检测到对象泄漏和/或双重释放(“检测到”,而不是“修复”:)。
不可避免的缺点 - 即使应用程序完全空闲,所有线程间通信实例内存也会永久分配。从池中弹出的对象将充满以前使用过的“垃圾”。如果“最慢”的线程在释放对象之前获取了一个对象,则应用程序可能会死锁,池为空,并且所有对象都排队到最慢的线程。当一个更简单的“new/queue/dispose”机制只会分配更多实例,从而更好地应对突发工作时,非常重的负载突发可能会导致应用程序“过早”限制自身。
平均值,
马丁
Another method is to new() X inter-thread comms instances at startup, put them on a queue and never create any more. Thread A pops objects off this pool queue, fills them with data and queues them to thread B. Thread B gets the objects, processes them and then returns them to the pool queue.
This provides flow control - if thread A tries to post too fast, the pool will dry up and A will have to wait on the pool queue until B returns objects. It has the potential to improve peformance since there are no mallocs and frees after the initial pool filling - the lock time on a queue push/pop will be less than that of a memory-manager call. There is no need for complex bounded queues - any old producer-consumer queue class will do. The pool can be used for inter-thread comms throughout a full app with many threads/threadPools, so flow-controlling them all. Shutdown problems can be mitigated - if the pool queue is created by the main thread at startup before any forms etc and never freed, it is often possible to avoid explicit background thread shutdowns on app close - a pain that would be nice to just forget about. Object leaks and/or double-releases are easily detected by monitoring the pool level, ('detected', not 'fixed':).
The inevitable downsides - all the inter -thread comms instance memory is permanently allocated even if the app is completely idle. An object popped off the pool will be full of 'garbage' from the previous use of it. If the 'slowest' thread gets an object before releasing one, it is possible for the app to deadlock with the pool empty and all objects queued to the slowest thread. A very heavy burst of loading may cause the app to throttle itself 'early' when a simpler 'new/queue/dispose' mechanism would just allocate more instances and so clope better with the burst of work.
Rgds,
Martin
最简单的解决方案是生产者线程检查队列是否已达到挂起项目的特定限制,如果是,则在推送更多工作之前进入睡眠状态。
其他解决方案取决于您要解决的实际问题,是处理更多 IO 限制还是 CPU 限制等,这甚至允许您设计甚至不需要队列的解决方案。例如:生产者线程可以生成(假设)10 个项目,并调用另一个消费者“方法”来并行处理它们,依此类推。
The simplest possible solution would be that the producer thread check if the queue has reached a certain limit of pending items, if so then go to sleep before pushing more work.
Other solutions depend on what the actual problem you are trying to solve, is the processing more IO bound or CPU bound etc, that will even allow you to design the solution which doesn't even need a queue. For ex: The producer thread can generate, lets say 10 items, and call another consumer "method" which process them in parallel and so on.