实施消费者队列的最佳方法,您可以从顺序删除项目(.NET 6)

发布于 2025-01-27 16:14:09 字数 3759 浏览 4 评论 0原文

这里的新海报,所以我希望这是有道理的...

我需要创建一个可以按顺序删除项目的集合(基本上是股票市场时间序列数据)。 数据生产者是多线程的,并且不能保证数据将按顺序出现。

我全方位地寻找了一个解决方案,但是我唯一能想到的就是使用contrentDictionary创建自己的自定义字典并实现iProducerConsumer接口,以便可以与BlockingCollection一起使用。

我下面的代码确实有效,但会产生错误

system.invalidoperationException:基础集合是 从阻塞票房的外部修改

,序列中的下一个键在字典中不存在。在这种情况下,我想等待指定的时间 然后尝试再次从队列中获取物品。

我的问题是:

  • 当没有钥匙时,处理错误的最佳方法是什么。目前,似乎要处理错误将需要退出循环。也许使用getConsumingEnumerable()不是正确的消费方法,而段落循环会更好?

代码如下 - 任何帮助/想法都非常感谢。

iProducerConsumer实现:

public abstract class BlockingDictionary<TKey, TValue> : IProducerConsumerCollection<KeyValuePair<TKey, TValue>> where TKey : notnull
{
    protected ConcurrentDictionary<TKey, TValue> _dictionary = new ConcurrentDictionary<TKey, TValue>();

    int ICollection.Count => _dictionary.Count;

    bool ICollection.IsSynchronized => false;

    object ICollection.SyncRoot => throw new NotSupportedException();

    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
    {
        if (array == null)
        {
            throw new ArgumentNullException("array");
        }
        _dictionary.ToList().CopyTo(array, index);
    }

    void ICollection.CopyTo(Array array, int index)
    {

        if (array == null)
        {
            throw new ArgumentNullException("array");
        }
        ((ICollection)_dictionary.ToList()).CopyTo(array, index);
    }

    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
    {
        return ((IEnumerable<KeyValuePair<TKey, TValue>>)_dictionary).GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return ((IEnumerable<KeyValuePair<TKey, TValue>>)this).GetEnumerator();
    }

    public KeyValuePair<TKey, TValue>[] ToArray()
    {
        return _dictionary.ToList().ToArray();
    }

    bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryAdd(KeyValuePair<TKey, TValue> item)
    {
        return _dictionary.TryAdd(item.Key, item.Value);
    }

    public virtual bool TryTake(out KeyValuePair<TKey, TValue> item)
    {
        item = this.FirstOrDefault();
        TValue? value;

        return _dictionary.TryRemove(item.Key, out value);
    }
}

时间序列队列实现(上面继承)

public class TimeSequenceQueue<T> : BlockingDictionary<DateTime, T>
{
    private DateTime _previousTime;
    private DateTime _nextTime;
    private readonly int _intervalSeconds;

    public TimeSequenceQueue(DateTime startTime, int intervalSeconds)
    {
        _intervalSeconds = intervalSeconds;
        _previousTime = startTime;
        _nextTime = startTime;
    }

    public override bool TryTake([MaybeNullWhen(false)] out KeyValuePair<DateTime, T> item)
    {
        item = _dictionary.SingleOrDefault(x => x.Key == _nextTime);
        T? value = default(T);

        if (item.Value == null)
            return false;

        bool result = _dictionary.TryRemove(item.Key, out value);

        if (result)
        {
            _previousTime = _nextTime;
            _nextTime = _nextTime.AddSeconds(_intervalSeconds);
        }

        return result;
    }
}

用法:

BlockingCollection<KeyValuePair<DateTime, object>> _queue = new BlockingCollection<KeyValuePair<DateTime, object>>(new TimeSequenceQueue<object>());

消费循环 - 开始在新线程中:

foreach (var item in _queue.GetConsumingEnumerable())
{
    // feed downstream
}

new poster here so I hope this makes sense ...

I need to create a collection that I can remove items from in sequence (basically stock market time series data).
The data producer is multi-threaded and doesn't guarantee that the data will come in sequence.

I've looked all around for a solution but the only thing I can come up with is to create my own custom dictionary, using ConcurrentDictionary and implementing the IProducerConsumer interface so it can be used with with BlockingCollection.

The code I have below does work, but produces an error

System.InvalidOperationException: The underlying collection was
modified from outside of the BlockingCollection

when using the GetConsumingEnumerable() for loop, and the next key in the sequence is not present in the dictionary. In this instance I would like to wait for a specified amount of time
and then attempt to take the item from the queue again.

My questions is:

  • What's the best way to handle the error when there is no key present. At the moment it seems handling the error would require exiting the loop. Perhaps using GetConsumingEnumerable() is not the right way to consume and a while loop would work better?

Code is below - any help/ideas much appreciated.

IProducerConsumer implementation:

public abstract class BlockingDictionary<TKey, TValue> : IProducerConsumerCollection<KeyValuePair<TKey, TValue>> where TKey : notnull
{
    protected ConcurrentDictionary<TKey, TValue> _dictionary = new ConcurrentDictionary<TKey, TValue>();

    int ICollection.Count => _dictionary.Count;

    bool ICollection.IsSynchronized => false;

    object ICollection.SyncRoot => throw new NotSupportedException();

    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
    {
        if (array == null)
        {
            throw new ArgumentNullException("array");
        }
        _dictionary.ToList().CopyTo(array, index);
    }

    void ICollection.CopyTo(Array array, int index)
    {

        if (array == null)
        {
            throw new ArgumentNullException("array");
        }
        ((ICollection)_dictionary.ToList()).CopyTo(array, index);
    }

    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
    {
        return ((IEnumerable<KeyValuePair<TKey, TValue>>)_dictionary).GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return ((IEnumerable<KeyValuePair<TKey, TValue>>)this).GetEnumerator();
    }

    public KeyValuePair<TKey, TValue>[] ToArray()
    {
        return _dictionary.ToList().ToArray();
    }

    bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryAdd(KeyValuePair<TKey, TValue> item)
    {
        return _dictionary.TryAdd(item.Key, item.Value);
    }

    public virtual bool TryTake(out KeyValuePair<TKey, TValue> item)
    {
        item = this.FirstOrDefault();
        TValue? value;

        return _dictionary.TryRemove(item.Key, out value);
    }
}

Time Sequence queue implementation (inherits above)

public class TimeSequenceQueue<T> : BlockingDictionary<DateTime, T>
{
    private DateTime _previousTime;
    private DateTime _nextTime;
    private readonly int _intervalSeconds;

    public TimeSequenceQueue(DateTime startTime, int intervalSeconds)
    {
        _intervalSeconds = intervalSeconds;
        _previousTime = startTime;
        _nextTime = startTime;
    }

    public override bool TryTake([MaybeNullWhen(false)] out KeyValuePair<DateTime, T> item)
    {
        item = _dictionary.SingleOrDefault(x => x.Key == _nextTime);
        T? value = default(T);

        if (item.Value == null)
            return false;

        bool result = _dictionary.TryRemove(item.Key, out value);

        if (result)
        {
            _previousTime = _nextTime;
            _nextTime = _nextTime.AddSeconds(_intervalSeconds);
        }

        return result;
    }
}

Usage:

BlockingCollection<KeyValuePair<DateTime, object>> _queue = new BlockingCollection<KeyValuePair<DateTime, object>>(new TimeSequenceQueue<object>());

Consuming loop - started in new thread:

foreach (var item in _queue.GetConsumingEnumerable())
{
    // feed downstream
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

黯然#的苍凉 2025-02-03 16:14:09

使用getConsumingEnumerable()进行循环,并且该序列中的下一个键在字典中不存在[...],我想等待指定的时间,然后等待尝试再次从队列中获取物品。

我将尝试一般回答这个问题,而不会过分关注您的问题的细节。假设您正在消费
a blockingcollection&lt&lt; t&lt; t&gt;代码>这样:

foreach (var item in collection.GetConsumingEnumerable())
{
    // Do something with the consumed item.
}

...您想避免无限期等待物品到达。您想每5秒醒来一次,然后再等待/再次睡觉。
这是您可以做到的:

while (!collection.IsCompleted)
{
    bool consumed = collection.TryTake(out var item, TimeSpan.FromSeconds(5));
    if (consumed)
    {
        // Do something with the consumed item.
    }
    else
    {
        // Do something before trying again to take an item.
    }
}

上面的模式模仿实际源代码

如果您想幻想,则可以将此功能合并到blockingCollection&lt; t&gt;类的自定义扩展方法中,例如:

public static IEnumerable<(bool Consumed, T Item)> GetConsumingEnumerable<T>(
    this BlockingCollection<T> source, TimeSpan timeout)
{
    while (!source.IsCompleted)
    {
        bool consumed = source.TryTake(out var item, timeout);
        yield return (consumed, item);
    }
}

用法示例:

foreach (var (consumed, item) in collection.GetConsumingEnumerable(
    TimeSpan.FromSeconds(5)))
{
    // Do something depending on whether an item was consumed or not.
}

When using the GetConsumingEnumerable() for loop, and the next key in the sequence is not present in the dictionary [...] I would like to wait for a specified amount of time and then attempt to take the item from the queue again.

I will try to answer this question generally, without paying too much attention to the specifics of your problem. So let's say that you are consuming
a BlockingCollection<T> like this:

foreach (var item in collection.GetConsumingEnumerable())
{
    // Do something with the consumed item.
}

...and you want to avoid waiting indefinitely for an item to arrive. You want to wake up every 5 seconds and do something, before waiting/sleeping again.
Here is how you could do it:

while (!collection.IsCompleted)
{
    bool consumed = collection.TryTake(out var item, TimeSpan.FromSeconds(5));
    if (consumed)
    {
        // Do something with the consumed item.
    }
    else
    {
        // Do something before trying again to take an item.
    }
}

The above pattern imitates the actual source code of the BlockingCollection<T>.GetConsumingEnumerable method.

If you want to get fancy you could incorporate this functionality in a custom extension method for the BlockingCollection<T> class, like this:

public static IEnumerable<(bool Consumed, T Item)> GetConsumingEnumerable<T>(
    this BlockingCollection<T> source, TimeSpan timeout)
{
    while (!source.IsCompleted)
    {
        bool consumed = source.TryTake(out var item, timeout);
        yield return (consumed, item);
    }
}

Usage example:

foreach (var (consumed, item) in collection.GetConsumingEnumerable(
    TimeSpan.FromSeconds(5)))
{
    // Do something depending on whether an item was consumed or not.
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文