并发集合和独特元素

发布于 2024-12-07 19:01:55 字数 71 浏览 5 评论 0原文

我有一个包含重复元素的并发 BlockingCollection 。如何修改它以添加或获取不同的元素?

I have a concurrent BlockingCollection with repeated elements. How can modify it to add or get distinct elements?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

BlockingCollection 的默认后备存储是 ConcurrentQueue。正如其他人指出的那样,使用它来添加不同的项目相当困难。

不过,您可以创建自己的实现 IProducerConsumerCollection 的集合类型,并将其传递给 BlockingCollection 构造函数。

想象一个 ConcurrentDictionary,其中包含当前队列中项目的键。要添加项目,请首先在字典上调用 TryAdd,如果该项目不在字典中,则将其添加,并将其添加到队列中。 Take(和 TryTake)从队列中获取下一个项目,将其从字典中删除,然后返回。

我希望有一个并发的 HashTable,但由于没有并发的,所以您必须使用 ConcurrentDictionary

The default backing store for BlockingCollection is a ConcurrentQueue. As somebody else pointed out, it's rather difficult to add distinct items using that.

However, you could create your own collection type that implements IProducerConsumerCollection, and pass that to the BlockingCollection constructor.

Imagine a ConcurrentDictionary that contains the keys of the items that are currently in the queue. To add an item, you call TryAdd on the dictionary first, and if the item isn't in the dictionary you add it, and also add it to the queue. Take (and TryTake) get the next item from the queue, remove it from the dictionary, and return.

I'd prefer if there was a concurrent HashTable, but since there isn't one, you'll have to do with ConcurrentDictionary.

天赋异禀 2024-12-14 19:01:55

这是 IProducerConsumerCollection< 的实现;T> 具有队列行为的集合,也拒绝重复的项目:

public class ConcurrentQueueNoDuplicates<T> : IProducerConsumerCollection<T>
{
    private readonly Queue<T> _queue = new();
    private readonly HashSet<T> _set;
    private object Locker => _queue;

    public ConcurrentQueueNoDuplicates(IEqualityComparer<T> comparer = default)
    {
        _set = new(comparer);
    }

    public bool TryAdd(T item)
    {
        lock (Locker)
        {
            if (!_set.Add(item))
                throw new DuplicateKeyException();
            _queue.Enqueue(item); return true;
        }
    }

    public bool TryTake(out T item)
    {
        lock (Locker)
        {
            if (_queue.Count == 0)
                throw new InvalidOperationException();
            item = _queue.Dequeue();
            bool removed = _set.Remove(item);
            Debug.Assert(removed);
            return true;
        }
    }

    public int Count { get { lock (Locker) return _queue.Count; } }
    public bool IsSynchronized => false;
    public object SyncRoot => throw new NotSupportedException();
    public T[] ToArray() { lock (Locker) return _queue.ToArray(); }
    public IEnumerator<T> GetEnumerator() => ToArray().AsEnumerable().GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
    public void CopyTo(T[] array, int index) => throw new NotSupportedException();
    public void CopyTo(Array array, int index) => throw new NotSupportedException();
}

public class DuplicateKeyException : InvalidOperationException { }

使用示例:

BlockingCollection<Item> queue = new(new ConcurrentQueueNoDuplicates<Item>());

//...

try { queue.Add(item); }
catch (DuplicateKeyException) { Console.WriteLine($"The {item} was rejected."); }

警告:调用如果项目重复,queue.TryAdd(item); 不会出现返回 false 的预期行为。任何添加重复项的尝试都必然会导致 DuplicateKeyException。不要尝试通过返回 false 来“修复”上述 ConcurrentQueueNoDuplicates.TryAdd 实现或 TryTakeBlockingCollection将通过抛出不同的异常 (InvalidOperationException) 做出反应,并且最重要的是其内部状态将被损坏。目前(.NET 7)一个错误,该错误会将 BlockingCollection 的有效容量减少 1,其底层存储具有 TryAdd 实现,该实现返回false。该错误已修复< /a> 对于 .NET 8,这将防止损坏,但不会改变错误抛出行为。

Here is an implementation of a IProducerConsumerCollection<T> collection with the behavior of a queue, that also rejects duplicate items:

public class ConcurrentQueueNoDuplicates<T> : IProducerConsumerCollection<T>
{
    private readonly Queue<T> _queue = new();
    private readonly HashSet<T> _set;
    private object Locker => _queue;

    public ConcurrentQueueNoDuplicates(IEqualityComparer<T> comparer = default)
    {
        _set = new(comparer);
    }

    public bool TryAdd(T item)
    {
        lock (Locker)
        {
            if (!_set.Add(item))
                throw new DuplicateKeyException();
            _queue.Enqueue(item); return true;
        }
    }

    public bool TryTake(out T item)
    {
        lock (Locker)
        {
            if (_queue.Count == 0)
                throw new InvalidOperationException();
            item = _queue.Dequeue();
            bool removed = _set.Remove(item);
            Debug.Assert(removed);
            return true;
        }
    }

    public int Count { get { lock (Locker) return _queue.Count; } }
    public bool IsSynchronized => false;
    public object SyncRoot => throw new NotSupportedException();
    public T[] ToArray() { lock (Locker) return _queue.ToArray(); }
    public IEnumerator<T> GetEnumerator() => ToArray().AsEnumerable().GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
    public void CopyTo(T[] array, int index) => throw new NotSupportedException();
    public void CopyTo(Array array, int index) => throw new NotSupportedException();
}

public class DuplicateKeyException : InvalidOperationException { }

Usage example:

BlockingCollection<Item> queue = new(new ConcurrentQueueNoDuplicates<Item>());

//...

try { queue.Add(item); }
catch (DuplicateKeyException) { Console.WriteLine(
quot;The {item} was rejected."); }

Caution: Calling queue.TryAdd(item); is not having the expected behavior of returning false if the item is a duplicate. Any attempt to add a duplicate item results invariably in a DuplicateKeyException. Do not attempt to "fix" the above ConcurrentQueueNoDuplicates<T>.TryAdd implementation, or the TryTake, by returning false. The BlockingCollection<T> will react by throwing a different exception (InvalidOperationException), and on top of that its internal state will become corrupted. There is currently (.NET 7) a bug that reduces by one the effective capacity of a BlockingCollection<T> whose underlying storage has a TryAdd implementation that returns false. The bug has been fixed for .NET 8, which will prevent the corruption, but it won't change the error-throwing behavior.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文