如何访问阻塞集合的底层默认并发队列?
我有多个生产者和一个消费者。但是,如果队列中有尚未消耗的内容,生产者不应再次将其排队。 (使用默认并发队列的唯一无重复阻塞集合)
if (!myBlockingColl.Contains(item))
myBlockingColl.Add(item)
但是阻塞集合没有 contains 方法,也不提供任何类似 TryPeek()
的方法。如何访问底层并发队列,以便我可以执行类似
if (!myBlockingColl.myConcurQ.trypeek(item)
myBlockingColl.Add(item)
In a tail spin 的操作?
I have multiple producers and a single consumer. However if there is something in the queue that is not yet consumed a producer should not queue it again. (unique no duplicates blocking collection that uses the default concurrent queue)
if (!myBlockingColl.Contains(item))
myBlockingColl.Add(item)
However the blocking collection does not have a contains method nor does it provide any kind of TryPeek()
like method. How can I access the underlying concurrent queue so I can do something like
if (!myBlockingColl.myConcurQ.trypeek(item)
myBlockingColl.Add(item)
In a tail spin?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
这是一个有趣的问题。这是我第一次看到有人要求一个忽略重复的阻塞队列。奇怪的是,我在 BCL 中找不到任何像你想要的东西。我说这很奇怪,因为
BlockingCollection
可以接受IProducerConsumerCollection
作为底层集合,该集合具有TryAdd
方法,该方法被宣传为能够在以下情况下失败检测到重复项。问题是我没有看到 IProducerConsumerCollection 的具体实现来防止重复。至少我们可以自己写。现在我们有了不接受重复项的
IProducerConsumerCollection
,我们可以像这样使用它:您可能不喜欢我的
NoDuplicatesConcurrentQueue
实现。如果您认为需要 TPL 集合提供的低锁性能,您当然可以自由地使用 ConcurrentQueue 或其他任何方式来实现。更新:
今天早上我能够测试代码。有一些好消息和坏消息。好消息是这在技术上是可行的。坏消息是您可能不想这样做,因为
BlockingCollection.TryAdd
会拦截底层IProducerConsumerCollection.TryAdd
方法的返回值,并在检测到 false
。是的,没错。它不会像您期望的那样返回false
,而是生成异常。我必须说实话,这既令人惊讶又荒谬。 TryXXX 方法的重点是它们不应该抛出异常。我深感失望。This is an interesting question. This is the first time I have seen someone ask for a blocking queue that ignores duplicates. Oddly enough I could find nothing like what you want that already exists in the BCL. I say this is odd because
BlockingCollection
can accept aIProducerConsumerCollection
as the underlying collection which has theTryAdd
method that is advertised as being able to fail when duplicates are detected. The problem is that I see no concrete implementation ofIProducerConsumerCollection
that prevents duplicates. At least we can write our own.Now that we have our
IProducerConsumerCollection
that does not accept duplicates we can use it like this:You may not like my implementation of
NoDuplicatesConcurrentQueue
. You are certainly free to implement your own usingConcurrentQueue
or whatever if you think you need the low-lock performance that the TPL collections provide.Update:
I was able to test the code this morning. There is some good news and bad news. The good news is that this will technically work. The bad news is that you probably will not want to do this because
BlockingCollection.TryAdd
intercepts the return value from the underlyingIProducerConsumerCollection.TryAdd
method and throws an exception whenfalse
is detected. Yep, that is right. It does not returnfalse
like you would expect and instead generates an exception. I have to be honest, this is both surprising and ridiculous. The whole point of the TryXXX methods is that they should not throw exceptions. I am deeply disappointed.除了 Update 之后 Brian Gideon 提到的警告之外,他的解决方案还存在以下性能问题:
queue.Contains(item) )
) 对性能产生严重影响,因为队列增长以下代码通过
System.Collections.Concurrent
命名空间由于没有
ConcurrentHashSet
,我使用ConcurrentDictionary
,忽略这些值。在这种罕见的情况下,幸运的是,可以简单地从多个更简单的数据结构中组合出更复杂的并发数据结构,而无需添加锁。两个并发数据结构上的操作顺序在这里很重要。
注意看待这个问题的另一种方式:您需要一个保留插入顺序的集合。
In addition to the caveat Brian Gideon mentioned after Update, his solution suffers from these performance issues:
queue.Contains(item)
) have a severe impact on performance as the queue growsThe following code improves on Brian's solution by
System.Collections.Concurrent
namespaceN.B. As there is no
ConcurrentHashSet
, I'm using aConcurrentDictionary
, ignoring the values.In this rare case it is luckily possible to simply compose a more complex concurrent data structure out of multiple simpler ones, without adding locks. The order of operations on the 2 concurrent data structures is important here.
N.B. Another way at looking at this problem: You want a set that preserves the insertion order.
我建议使用锁来实现您的操作,这样您就不会以破坏项目的方式读写项目,从而使它们成为原子的。例如,对于任何 IEnumerable:
I would suggest implementing your operations with lock so that you don't read and write the item in a way that corrupts it, making them atomic. For example, with any IEnumerable:
默认情况下,
BlockingCollection
由ConcurrentQueue
支持。换句话说,如果您没有显式指定其后备存储,它将在幕后创建一个ConcurrentQueue
。由于您希望直接访问底层存储,因此可以手动创建一个ConcurrentQueue
并将其传递给BlockingCollection
的构造函数:不幸的是
ConcurrentQueue
集合没有TryPeek
带有输入参数的方法,因此您想要做的事情是不可能的:另请注意,
队列
现在由集合
拥有。如果您尝试直接更改它(通过发出Enqueue
或TryDequeue
命令),collection
将引发异常。The
BlockingCollection<T>
is backed by aConcurrentQueue<T>
by default. In other words if you don't specify explicitly its backing storage, it will create aConcurrentQueue<T>
behind the scenes. Since you want to have direct access to the underlying storage, you can create manually aConcurrentQueue<T>
and pass it to the constructor of theBlockingCollection<T>
:Unfortunately the
ConcurrentQueue<T>
collection doesn't have aTryPeek
method with an input parameter, so what you intend to do is not possible:Also be aware that the
queue
is now owned by thecollection
. If you attempt to mutate it directly (by issuingEnqueue
orTryDequeue
commands), thecollection
will throw exceptions.