PFX ConcurrentQueue - 有没有办法从队列中删除特定项目

发布于 2024-07-15 17:35:52 字数 1358 浏览 10 评论 0原文

我有一个应用程序,它有一个具有 ID 属性的项目的 ConcurrentQueue 和每个项目的任务的 ConcurrentQueue,队列项目看起来像:

class QueueItem {
  public int ID { get; set; }
  public ConcurrentQueue<WorkItem> workItemQueue { get; set; }
}

队列本身看起来像:

ConcurrentQueue<QueueItem> itemQueue;

我有一个线程在 itemQueue 上执行 foreach,将一个每个队列中的项目并对其进行工作:

foreach(var queueItem in itemQueue) {
  WorkItem workItem;
  if (queueItem.workItemQueue.TryDequeue(out workItem))
    doWork(workItem);
  else
    // no more workItems for this queueItem
}

我正在使用 ConcurrentQueues,因为我有一个单独的线程可能将queueItems添加到itemQueue,并将workItems添加到每个workItemQueue。

当我在队列项目中没有更多的工作项目时,我的问题就出现了 - 我想从项目队列中删除该队列项目 - 就像......

  if (queueItem.workItemQueue.TryDequeue(out workItem))
    doWork(workItem);
  else
    itemQueue.TryRemove(queueItem);

但我找不到一种方法可以轻松做到这一点。 我想出的方法是使每个 QueueItem 出队,然后在 workItemQueue 中仍有 WorkItems 时将其入队:

for (int i = 0; i < itemQueue.Count; i++) {
  QueueItem item;
  itemQueue.TryDequeue(out queueItem);
  if (queueItem.workItemQueue.TryDequeue(out workItem)) {
    itemQueue.Enqueue(queueItem);
    doWork(workItem);
  }
  else
    break;
}

是否有更好的方法来使用 PFX ConcurrentQueue 完成我想要的任务,或者这是执行此操作的合理方法,我应该使用自定义并发队列/列表实现还是我遗漏了一些东西?

I have an app that has a ConcurrentQueue of items that have an ID property and a ConcurrentQueue of tasks for each item, the queue items look like:

class QueueItem {
  public int ID { get; set; }
  public ConcurrentQueue<WorkItem> workItemQueue { get; set; }
}

and the queue itself looks like:

ConcurrentQueue<QueueItem> itemQueue;

I have one thread doing a foreach over the itemQueue, deQueueing an item from each queue and doing work on it:

foreach(var queueItem in itemQueue) {
  WorkItem workItem;
  if (queueItem.workItemQueue.TryDequeue(out workItem))
    doWork(workItem);
  else
    // no more workItems for this queueItem
}

I'm using ConcurrentQueues because I have a separate thread potentially adding queueItems to the itemQueue, and adding workItems to each workItemQueue.

My problem comes when I have no more workItems in a queueItem - I'd like to remove that queueItem from the itemQueue - something like...

  if (queueItem.workItemQueue.TryDequeue(out workItem))
    doWork(workItem);
  else
    itemQueue.TryRemove(queueItem);

...but I can't find a way to do that easily. The way i've come up with is to dequeue each QueueItem and then Enqueue it if there's still WorkItems in the workItemQueue:

for (int i = 0; i < itemQueue.Count; i++) {
  QueueItem item;
  itemQueue.TryDequeue(out queueItem);
  if (queueItem.workItemQueue.TryDequeue(out workItem)) {
    itemQueue.Enqueue(queueItem);
    doWork(workItem);
  }
  else
    break;
}

Is there a better way to accomplish what I want using the PFX ConcurrentQueue, or is this a reasonable way to do this, should I use a custom concurrent queue/list implementation or am I missing something?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

昨迟人 2024-07-22 17:35:52

一般来说,没有有效的方法从队列中删除特定项目。 它们通常具有 O(1) 队列和出列,但 O(n) 删除,这就是您的实现所做的。

一种替代结构称为 LinkedHashMap。 查看 Java 实现 如果你感兴趣。

它本质上是一个哈希表和一个链表,允许 O(1) 排队、出队和删除。

这尚未在 .Net 中实现,但网络上有一些实现。

现在的问题是,为什么 itemQueue 是一个队列? 从您的代码示例中,您永远不会将任何内容放入队列或出列(除了解决“删除”问题)。 我怀疑如果使用更合适的数据结构,您的问题可以简化。 您能举例说明哪些其他代码片段访问 itemQueue 吗?

In general, there is no efficient ways to remove specific items from queues. They generally have O(1) queue and dequeues, but O(n) removes, which is what your implementation does.

One alternative structure is something called a LinkedHashMap. Have a look at the Java implementation if you are interested.

It is essentially a Hash table and a linked list, which allows O(1) queue, dequeue and remove.

This isn't implemented in .Net yet, but there are a few implementations floating around the web.

Now, the question is, why is itemQueue a queue? From your code samples, you never enqueue or dequeue anything from it (except to navigate around the Remove problem). I have a suspicion that your problem could be simplified if a more suitable data structure is used. Could you give examples on what other pieces of code access itemQueue?

沙沙粒小 2024-07-22 17:35:52

这可能并不适合所有人,但以下是我想出的从并发队列中删除项目的解决方案,因为这是第一个谷歌结果,我想我会留下我的解决方案。

我所做的就是暂时用空队列替换工作队列,将原始队列转换为列表并删除项目,然后从修改后的列表中创建一个新队列并将其放回原处。

在代码中(抱歉,这是 VB.net 而不是 C#):

Dim found As Boolean = False
//'Steal the queue for a second, wrap the rest in a try-finally block to make sure we give it back
Dim theCommandQueue = Interlocked.Exchange(_commandQueue, New ConcurrentQueue(Of Command))
Try
    Dim cmdList = theCommandQueue.ToList()
    For Each item In cmdList
        If item Is whateverYouAreLookingFor Then
            cmdList.Remove(item)
            found = True
        End If
    Next
    //'If we found the item(s) we were looking for, create a new queue from the modified list.
    If found Then
        theCommandQueue = New ConcurrentQueue(Of Command)(cmdList)
    End If
Finally
    //'always put the queue back where we found it
    Interlocked.Exchange(_commandQueue, theCommandQueue)
End Try

旁白:这是我的第一个答案,因此请随意提出一些编辑建议和/或编辑我的答案。

This may not work for everyone, but the following is the solution I came up with for removing an item from a concurrent queue, since this is the first google result, I thought I would leave my solution behind.

What I did was temporarily replace the working queue with an empty, convert the original to a list and remove the item(s), then create a new queue from the modified list and put it back.

In code (sorry this is VB.net rather C#):

Dim found As Boolean = False
//'Steal the queue for a second, wrap the rest in a try-finally block to make sure we give it back
Dim theCommandQueue = Interlocked.Exchange(_commandQueue, New ConcurrentQueue(Of Command))
Try
    Dim cmdList = theCommandQueue.ToList()
    For Each item In cmdList
        If item Is whateverYouAreLookingFor Then
            cmdList.Remove(item)
            found = True
        End If
    Next
    //'If we found the item(s) we were looking for, create a new queue from the modified list.
    If found Then
        theCommandQueue = New ConcurrentQueue(Of Command)(cmdList)
    End If
Finally
    //'always put the queue back where we found it
    Interlocked.Exchange(_commandQueue, theCommandQueue)
End Try

Aside: This is my first answer, so feel free to put up some editing advice and/or edit my answer.

懒猫 2024-07-22 17:35:52

当您想要以先进先出的方式处理项目时,队列意味着后进先出的堆栈。 还有一个并发字典和一个并发包。 确保队列确实是您想要的。 我认为我不会在并发队列上执行 foreach 操作。

您可能想要的是工作项的单个队列(让它们使用公共接口并在接口上创建一个队列,该接口应该公开继承的类型,以后如果需要的话可以将其重新转换为该继承类型)。 如果工作项属于父项,则可以使用一个属性来保存父项的密钥(考虑密钥的 GUID),并且可以将父项保留在并发字典中并根据需要引用/删除。

如果您必须按照现有方式进行操作,请考虑添加一个标志。 然后,您可以将 itemqueue 中的项目标记为“已关闭”或其他任何内容,这样当它出列时,它将被忽略。

Queues are meant when you want to handle items in a FIFO style, Stacks for LIFO. There is also a concurrentdictionary and a concurrentbag. Make sure that a queue is actually what you want. I don't think I would ever do a foreach on a concurrentqueue.

What you likely want is a single queue for your work items (have them use a common interface and make a queue on the interface, the interface should expose the inherited type to which it can be recast later if needed). If the workitems belong to a parent, then a property can be used which will hold a key to the parent (consider a GUID for the key), and the parent can be kept in a concurrentdictionary and referenced/removed as needed.

If you must do it the way you have it, consider adding a flag. you can then mark the item in the itemqueue as 'closed' or whatever, so that when it is dequeued, it will get ignored.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文