如何访问阻塞集合的底层默认并发队列?

发布于 2024-11-18 19:51:15 字数 369 浏览 8 评论 0原文

我有多个生产者和一个消费者。但是,如果队列中有尚未消耗的内容,生产者不应再次将其排队。 (使用默认并发队列的唯一无重复阻塞集合)

if (!myBlockingColl.Contains(item))
    myBlockingColl.Add(item)

但是阻塞集合没有 contains 方法,也不提供任何类似 TryPeek() 的方法。如何访问底层并发队列,以便我可以执行类似

if (!myBlockingColl.myConcurQ.trypeek(item)
  myBlockingColl.Add(item)

In a tail spin 的操作?

I have multiple producers and a single consumer. However if there is something in the queue that is not yet consumed a producer should not queue it again. (unique no duplicates blocking collection that uses the default concurrent queue)

if (!myBlockingColl.Contains(item))
    myBlockingColl.Add(item)

However the blocking collection does not have a contains method nor does it provide any kind of TryPeek() like method. How can I access the underlying concurrent queue so I can do something like

if (!myBlockingColl.myConcurQ.trypeek(item)
  myBlockingColl.Add(item)

In a tail spin?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

以歌曲疗慰 2024-11-25 19:51:15

这是一个有趣的问题。这是我第一次看到有人要求一个忽略重复的阻塞队列。奇怪的是,我在 BCL 中找不到任何像你想要的东西。我说这很奇怪,因为 BlockingCollection 可以接受 IProducerConsumerCollection 作为底层集合,该集合具有 TryAdd 方法,该方法被宣传为能够在以下情况下失败检测到重复项。问题是我没有看到 IProducerConsumerCollection 的具体实现来防止重复。至少我们可以自己写。

public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
  // TODO: You will need to fully implement IProducerConsumerCollection.

  private Queue<T> queue = new Queue<T>();

  public bool TryAdd(T item)
  {
    lock (queue)
    {
      if (!queue.Contains(item))
      {
        queue.Enqueue(item);
        return true;
      }
      return false;
    }
  }

  public bool TryTake(out T item)
  {
    lock (queue)
    {
      item = null;
      if (queue.Count > 0)
      {
        item = queue.Dequeue();
      }
      return item != null;
    }
  }
}

现在我们有了不接受重复项的 IProducerConsumerCollection,我们可以像这样使用它:

public class Example
{
  private BlockingCollection<object> queue = new BlockingCollection<object>(new NoDuplicatesConcurrentQueue<object>());

  public Example()
  {
    new Thread(Consume).Start();
  }

  public void Produce(object item)
  {
    bool unique = queue.TryAdd(item);
  }

  private void Consume()
  {
    while (true)
    {
      object item = queue.Take();
    }
  }
}

您可能不喜欢我的 NoDuplicatesConcurrentQueue 实现。如果您认为需要 TPL 集合提供的低锁性能,您当然可以自由地使用 ConcurrentQueue 或其他任何方式来实现。

更新:

今天早上我能够测试代码。有一些好消息和坏消息。好消息是这在技术上是可行的。坏消息是您可能不想这样做,因为 BlockingCollection.TryAdd 会拦截底层 IProducerConsumerCollection.TryAdd 方法的返回值,并在 检测到 false。是的,没错。它不会像您期望的那样返回 false ,而是生成异常。我必须说实话,这既令人惊讶又荒谬。 TryXXX 方法的重点是它们不应该抛出异常。我深感失望。

This is an interesting question. This is the first time I have seen someone ask for a blocking queue that ignores duplicates. Oddly enough I could find nothing like what you want that already exists in the BCL. I say this is odd because BlockingCollection can accept a IProducerConsumerCollection as the underlying collection which has the TryAdd method that is advertised as being able to fail when duplicates are detected. The problem is that I see no concrete implementation of IProducerConsumerCollection that prevents duplicates. At least we can write our own.

public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
  // TODO: You will need to fully implement IProducerConsumerCollection.

  private Queue<T> queue = new Queue<T>();

  public bool TryAdd(T item)
  {
    lock (queue)
    {
      if (!queue.Contains(item))
      {
        queue.Enqueue(item);
        return true;
      }
      return false;
    }
  }

  public bool TryTake(out T item)
  {
    lock (queue)
    {
      item = null;
      if (queue.Count > 0)
      {
        item = queue.Dequeue();
      }
      return item != null;
    }
  }
}

Now that we have our IProducerConsumerCollection that does not accept duplicates we can use it like this:

public class Example
{
  private BlockingCollection<object> queue = new BlockingCollection<object>(new NoDuplicatesConcurrentQueue<object>());

  public Example()
  {
    new Thread(Consume).Start();
  }

  public void Produce(object item)
  {
    bool unique = queue.TryAdd(item);
  }

  private void Consume()
  {
    while (true)
    {
      object item = queue.Take();
    }
  }
}

You may not like my implementation of NoDuplicatesConcurrentQueue. You are certainly free to implement your own using ConcurrentQueue or whatever if you think you need the low-lock performance that the TPL collections provide.

Update:

I was able to test the code this morning. There is some good news and bad news. The good news is that this will technically work. The bad news is that you probably will not want to do this because BlockingCollection.TryAdd intercepts the return value from the underlying IProducerConsumerCollection.TryAdd method and throws an exception when false is detected. Yep, that is right. It does not return false like you would expect and instead generates an exception. I have to be honest, this is both surprising and ridiculous. The whole point of the TryXXX methods is that they should not throw exceptions. I am deeply disappointed.

仙女山的月亮 2024-11-25 19:51:15

除了 Update 之后 Brian Gideon 提到的警告之外,他的解决方案还存在以下性能问题:

  • O(n) 对队列的操作 (queue.Contains(item) )) 对性能产生严重影响,因为队列增长
  • 锁限制了并发性(他确实提到了)

以下代码通过

  • 使用哈希集执行 O(1) 查找
  • 改进了 Brian 的解决方案结合 2 个数据结构的 System.Collections.Concurrent 命名

空间由于没有 ConcurrentHashSet,我使用 ConcurrentDictionary,忽略这些值。

在这种罕见的情况下,幸运的是,可以简单地从多个更简单的数据结构中组合出更复杂的并发数据结构,而无需添加锁。两个并发数据结构上的操作顺序在这里很重要。

public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
    private readonly ConcurrentDictionary<T, bool> existingElements = new ConcurrentDictionary<T, bool>();
    private readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public bool TryAdd(T item)
    {
        if (existingElements.TryAdd(item, false))
        {
            queue.Enqueue(item);
            return true;
        }
        return false;
    }

    public bool TryTake(out T item)
    {
        if (queue.TryDequeue(out item))
        {
            bool _;
            existingElements.TryRemove(item, out _);
            return true;
        }
        return false;
    }
    ...
}

注意看待这个问题的另一种方式:您需要一个保留插入顺序的集合。

In addition to the caveat Brian Gideon mentioned after Update, his solution suffers from these performance issues:

  • O(n) operations on the queue (queue.Contains(item)) have a severe impact on performance as the queue grows
  • locks limit concurrency (which he does mention)

The following code improves on Brian's solution by

  • using a hash set to do O(1) lookups
  • combining 2 data structures from the System.Collections.Concurrent namespace

N.B. As there is no ConcurrentHashSet, I'm using a ConcurrentDictionary, ignoring the values.

In this rare case it is luckily possible to simply compose a more complex concurrent data structure out of multiple simpler ones, without adding locks. The order of operations on the 2 concurrent data structures is important here.

public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
    private readonly ConcurrentDictionary<T, bool> existingElements = new ConcurrentDictionary<T, bool>();
    private readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public bool TryAdd(T item)
    {
        if (existingElements.TryAdd(item, false))
        {
            queue.Enqueue(item);
            return true;
        }
        return false;
    }

    public bool TryTake(out T item)
    {
        if (queue.TryDequeue(out item))
        {
            bool _;
            existingElements.TryRemove(item, out _);
            return true;
        }
        return false;
    }
    ...
}

N.B. Another way at looking at this problem: You want a set that preserves the insertion order.

怂人 2024-11-25 19:51:15

我建议使用锁来实现您的操作,这样您就不会以破坏项目的方式读写项目,从而使它们成为原子的。例如,对于任何 IEnumerable:

object bcLocker = new object();

// ...

lock (bcLocker)
{
    bool foundTheItem = false;
    foreach (someClass nextItem in myBlockingColl)
    {
        if (nextItem.Equals(item))
        {
            foundTheItem = true;
            break;
        }
    }
    if (foundTheItem == false)
    {
        // Add here
    }
}

I would suggest implementing your operations with lock so that you don't read and write the item in a way that corrupts it, making them atomic. For example, with any IEnumerable:

object bcLocker = new object();

// ...

lock (bcLocker)
{
    bool foundTheItem = false;
    foreach (someClass nextItem in myBlockingColl)
    {
        if (nextItem.Equals(item))
        {
            foundTheItem = true;
            break;
        }
    }
    if (foundTheItem == false)
    {
        // Add here
    }
}
无名指的心愿 2024-11-25 19:51:15

如何访问阻塞集合的底层默认并发队列?

默认情况下,BlockingCollectionConcurrentQueue 支持。换句话说,如果您没有显式指定其后备存储,它将在幕后创建一个 ConcurrentQueue。由于您希望直接访问底层存储,因此可以手动创建一个 ConcurrentQueue 并将其传递给 BlockingCollection 的构造函数:

ConcurrentQueue<Item> queue = new();
BlockingCollection<Item> collection = new(queue);

不幸的是ConcurrentQueue 集合没有 TryPeek 带有输入参数的方法,因此您想要做的事情是不可能的:

if (!queue.TryPeek(item)) // Compile error, missing out keyword
    collection.Add(item);

另请注意,队列现在由集合拥有。如果您尝试直接更改它(通过发出 EnqueueTryDequeue 命令),collection 将引发异常。

How to access the underlying default concurrent queue of a blocking collection?

The BlockingCollection<T> is backed by a ConcurrentQueue<T> by default. In other words if you don't specify explicitly its backing storage, it will create a ConcurrentQueue<T> behind the scenes. Since you want to have direct access to the underlying storage, you can create manually a ConcurrentQueue<T> and pass it to the constructor of the BlockingCollection<T>:

ConcurrentQueue<Item> queue = new();
BlockingCollection<Item> collection = new(queue);

Unfortunately the ConcurrentQueue<T> collection doesn't have a TryPeek method with an input parameter, so what you intend to do is not possible:

if (!queue.TryPeek(item)) // Compile error, missing out keyword
    collection.Add(item);

Also be aware that the queue is now owned by the collection. If you attempt to mutate it directly (by issuing Enqueue or TryDequeue commands), the collection will throw exceptions.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文