在 Linq 语句中使用 ConcurrentQueue
如果我有 ConcurrentQueue,是否有首选方法通过 Linq 语句使用它?它没有将所有项目作为序列出队的方法,并且它的枚举器不会删除项目。
我正在进行批量消费,这意味着我想定期处理队列并将其清空,而不是处理它直到队列为空并阻塞直到更多项目排队。 BlockingCollection 似乎不起作用,因为它在到达最后一项时会阻塞,并且我希望该线程执行其他操作,例如清除其他队列。
static ConcurrentQueue<int> MyQueue = new ConcurrentQueue<int>();
void Main()
{
MyQueue.Enqueue(1);MyQueue.Enqueue(2);MyQueue.Enqueue(3);MyQueue.Enqueue(4);MyQueue.Enqueue(5);
var lst = MyQueue.ToLookup(x => x.SomeProperty);
//queue still has all elements
MyQueue.Dump("queue");
}
现在,我已经制作了一个辅助方法
static IEnumerable<T> ReadAndEmptyQueue<T>(this ConcurrentQueue<T> q)
{
T item;
while(q.TryDequeue(out item))
{
yield return item;
}
}
var lk = MyQueue.ReadAndEmptyQueue().ToLookup(x => x.SomeProperty);
MyQueue.Dump(); //size is now zero
是否有更好的方法,或者我做得对吗?
If I have a ConcurrentQueue, is there a preferred way to consume it with a Linq statement? It doesn't have a method to dequeue all the items as a sequence, and it's enumerator doesn't remove items.
I'm doing batch consumption, meaning periodically I want to process the queue and empty it, instead of processing it until it is empty and blocking until more items are enqueued. BlockingCollection doesn't seem like it will work because it will block when it gets to the last item, and I want that thread to do other stuff, like clear other queues.
static ConcurrentQueue<int> MyQueue = new ConcurrentQueue<int>();
void Main()
{
MyQueue.Enqueue(1);MyQueue.Enqueue(2);MyQueue.Enqueue(3);MyQueue.Enqueue(4);MyQueue.Enqueue(5);
var lst = MyQueue.ToLookup(x => x.SomeProperty);
//queue still has all elements
MyQueue.Dump("queue");
}
For now, I've made a helper method
static IEnumerable<T> ReadAndEmptyQueue<T>(this ConcurrentQueue<T> q)
{
T item;
while(q.TryDequeue(out item))
{
yield return item;
}
}
var lk = MyQueue.ReadAndEmptyQueue().ToLookup(x => x.SomeProperty);
MyQueue.Dump(); //size is now zero
Is there a better way, or am I doing it right?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
在我看来,你的做法非常合理。允许消费者以这种方式清空队列是干净而简单的。
我要提到的一件事 - 有时,从设计的角度来看,为每个队列触发一个单独的消费者线程会更容易。如果这样做,每个
BlockingCollection
只能使用GetConsumingEnumerable()
并根据需要进行阻止,因为当队列为空时它们将处于等待状态。这是我更经常采用的方法,因为从同步的角度来看,如果每个集合都有一个或多个专用消费者,而不是消费者在其正在消费的内容之间切换,那么从同步的角度来看,它通常要简单得多。
Your approach is very reasonable, in my opinion. Allowing a consumer to empty the queue in this fashion is clean and simple.
The one thing I'd mention - sometimes, from a design standpoint, it's easier to just fire off a separate consumer thread per queue. If you do that, each
BlockingCollection<T>
can just useGetConsumingEnumerable()
and block as needed, as they'll be in a wait state when the queue is empty.This is the approach I take more frequently, as it's often much simpler from a synchronization standpoint if each collection has one or more dedicated consumers, instead of a consumer switching between what it's consuming.