实施消费者队列的最佳方法,您可以从顺序删除项目(.NET 6)
这里的新海报,所以我希望这是有道理的...
我需要创建一个可以按顺序删除项目的集合(基本上是股票市场时间序列数据)。 数据生产者是多线程的,并且不能保证数据将按顺序出现。
我全方位地寻找了一个解决方案,但是我唯一能想到的就是使用contrentDictionary创建自己的自定义字典并实现iProducerConsumer接口,以便可以与BlockingCollection一起使用。
我下面的代码确实有效,但会产生错误
system.invalidoperationException:基础集合是 从阻塞票房的外部修改
,序列中的下一个键在字典中不存在。在这种情况下,我想等待指定的时间 然后尝试再次从队列中获取物品。
我的问题是:
- 当没有钥匙时,处理错误的最佳方法是什么。目前,似乎要处理错误将需要退出循环。也许使用getConsumingEnumerable()不是正确的消费方法,而段落循环会更好?
代码如下 - 任何帮助/想法都非常感谢。
iProducerConsumer实现:
public abstract class BlockingDictionary<TKey, TValue> : IProducerConsumerCollection<KeyValuePair<TKey, TValue>> where TKey : notnull
{
protected ConcurrentDictionary<TKey, TValue> _dictionary = new ConcurrentDictionary<TKey, TValue>();
int ICollection.Count => _dictionary.Count;
bool ICollection.IsSynchronized => false;
object ICollection.SyncRoot => throw new NotSupportedException();
public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
{
if (array == null)
{
throw new ArgumentNullException("array");
}
_dictionary.ToList().CopyTo(array, index);
}
void ICollection.CopyTo(Array array, int index)
{
if (array == null)
{
throw new ArgumentNullException("array");
}
((ICollection)_dictionary.ToList()).CopyTo(array, index);
}
public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
{
return ((IEnumerable<KeyValuePair<TKey, TValue>>)_dictionary).GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable<KeyValuePair<TKey, TValue>>)this).GetEnumerator();
}
public KeyValuePair<TKey, TValue>[] ToArray()
{
return _dictionary.ToList().ToArray();
}
bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryAdd(KeyValuePair<TKey, TValue> item)
{
return _dictionary.TryAdd(item.Key, item.Value);
}
public virtual bool TryTake(out KeyValuePair<TKey, TValue> item)
{
item = this.FirstOrDefault();
TValue? value;
return _dictionary.TryRemove(item.Key, out value);
}
}
时间序列队列实现(上面继承)
public class TimeSequenceQueue<T> : BlockingDictionary<DateTime, T>
{
private DateTime _previousTime;
private DateTime _nextTime;
private readonly int _intervalSeconds;
public TimeSequenceQueue(DateTime startTime, int intervalSeconds)
{
_intervalSeconds = intervalSeconds;
_previousTime = startTime;
_nextTime = startTime;
}
public override bool TryTake([MaybeNullWhen(false)] out KeyValuePair<DateTime, T> item)
{
item = _dictionary.SingleOrDefault(x => x.Key == _nextTime);
T? value = default(T);
if (item.Value == null)
return false;
bool result = _dictionary.TryRemove(item.Key, out value);
if (result)
{
_previousTime = _nextTime;
_nextTime = _nextTime.AddSeconds(_intervalSeconds);
}
return result;
}
}
用法:
BlockingCollection<KeyValuePair<DateTime, object>> _queue = new BlockingCollection<KeyValuePair<DateTime, object>>(new TimeSequenceQueue<object>());
消费循环 - 开始在新线程中:
foreach (var item in _queue.GetConsumingEnumerable())
{
// feed downstream
}
new poster here so I hope this makes sense ...
I need to create a collection that I can remove items from in sequence (basically stock market time series data).
The data producer is multi-threaded and doesn't guarantee that the data will come in sequence.
I've looked all around for a solution but the only thing I can come up with is to create my own custom dictionary, using ConcurrentDictionary and implementing the IProducerConsumer interface so it can be used with with BlockingCollection.
The code I have below does work, but produces an error
System.InvalidOperationException: The underlying collection was
modified from outside of the BlockingCollection
when using the GetConsumingEnumerable() for loop, and the next key in the sequence is not present in the dictionary. In this instance I would like to wait for a specified amount of time
and then attempt to take the item from the queue again.
My questions is:
- What's the best way to handle the error when there is no key present. At the moment it seems handling the error would require exiting the loop. Perhaps using GetConsumingEnumerable() is not the right way to consume and a while loop would work better?
Code is below - any help/ideas much appreciated.
IProducerConsumer implementation:
public abstract class BlockingDictionary<TKey, TValue> : IProducerConsumerCollection<KeyValuePair<TKey, TValue>> where TKey : notnull
{
protected ConcurrentDictionary<TKey, TValue> _dictionary = new ConcurrentDictionary<TKey, TValue>();
int ICollection.Count => _dictionary.Count;
bool ICollection.IsSynchronized => false;
object ICollection.SyncRoot => throw new NotSupportedException();
public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
{
if (array == null)
{
throw new ArgumentNullException("array");
}
_dictionary.ToList().CopyTo(array, index);
}
void ICollection.CopyTo(Array array, int index)
{
if (array == null)
{
throw new ArgumentNullException("array");
}
((ICollection)_dictionary.ToList()).CopyTo(array, index);
}
public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
{
return ((IEnumerable<KeyValuePair<TKey, TValue>>)_dictionary).GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable<KeyValuePair<TKey, TValue>>)this).GetEnumerator();
}
public KeyValuePair<TKey, TValue>[] ToArray()
{
return _dictionary.ToList().ToArray();
}
bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryAdd(KeyValuePair<TKey, TValue> item)
{
return _dictionary.TryAdd(item.Key, item.Value);
}
public virtual bool TryTake(out KeyValuePair<TKey, TValue> item)
{
item = this.FirstOrDefault();
TValue? value;
return _dictionary.TryRemove(item.Key, out value);
}
}
Time Sequence queue implementation (inherits above)
public class TimeSequenceQueue<T> : BlockingDictionary<DateTime, T>
{
private DateTime _previousTime;
private DateTime _nextTime;
private readonly int _intervalSeconds;
public TimeSequenceQueue(DateTime startTime, int intervalSeconds)
{
_intervalSeconds = intervalSeconds;
_previousTime = startTime;
_nextTime = startTime;
}
public override bool TryTake([MaybeNullWhen(false)] out KeyValuePair<DateTime, T> item)
{
item = _dictionary.SingleOrDefault(x => x.Key == _nextTime);
T? value = default(T);
if (item.Value == null)
return false;
bool result = _dictionary.TryRemove(item.Key, out value);
if (result)
{
_previousTime = _nextTime;
_nextTime = _nextTime.AddSeconds(_intervalSeconds);
}
return result;
}
}
Usage:
BlockingCollection<KeyValuePair<DateTime, object>> _queue = new BlockingCollection<KeyValuePair<DateTime, object>>(new TimeSequenceQueue<object>());
Consuming loop - started in new thread:
foreach (var item in _queue.GetConsumingEnumerable())
{
// feed downstream
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我将尝试一般回答这个问题,而不会过分关注您的问题的细节。假设您正在消费
a
blockingcollection&lt&lt; t&lt; t&gt;代码>
这样:
...您想避免无限期等待物品到达。您想每5秒醒来一次,然后再等待/再次睡觉。
这是您可以做到的:
上面的模式模仿实际源代码 。
如果您想幻想,则可以将此功能合并到
blockingCollection&lt; t&gt;
类的自定义扩展方法中,例如:用法示例:
I will try to answer this question generally, without paying too much attention to the specifics of your problem. So let's say that you are consuming
a
BlockingCollection<T>
like this:...and you want to avoid waiting indefinitely for an item to arrive. You want to wake up every 5 seconds and do something, before waiting/sleeping again.
Here is how you could do it:
The above pattern imitates the actual source code of the
BlockingCollection<T>.GetConsumingEnumerable
method.If you want to get fancy you could incorporate this functionality in a custom extension method for the
BlockingCollection<T>
class, like this:Usage example: