带有序列ID的螺纹安全固定尺寸的圆形缓冲液

发布于 2025-02-11 11:33:04 字数 1787 浏览 2 评论 0 原文

我需要一个具有这些功能的队列:

  • 固定尺寸(IE圆形缓冲区)
  • 队列项目具有ID(例如主键),它们是顺序的
  • 线程安全(来自多个ASP.NET核心请求),

以避免锁定,i a concurrentqueue ,但找到了种族条件。因此,我正在尝试一种自定义方法。

public interface IQueueItem
{
    long Id { get; set; }
}

public class CircularBuffer<T> : LinkedList<T> where T : class, IQueueItem
{
    public CircularBuffer(int capacity) => _capacity = capacity;
    private readonly int _capacity;

    private long _counter = 0;
    private readonly object _lock = new();

    public void Enqueue(T item)
    {
        lock (_lock) {         // works but feels "heavy"
            _counter++;
            item.Id = _counter;
            if (Count == _capacity) RemoveFirst();
            AddLast(item);
        }
    }
}

并测试:

public class Item : IQueueItem
{
    public long Id { get; set; }
    //...
}

public class Program
{
    public static void Main()
    {
        var q = new CircularBuffer<Item>(10);
        Parallel.For(0, 15, i => q.Enqueue(new Item()));
        Console.WriteLine(string.Join(", ", q.Select(x => x.Id)));
    }
}

它提供了正确的输出(即使竞争线程逐渐订购,并且具有固定尺寸,并且最古老的物品均列出):

6、7、8、9、10、11、12、13、14、15

实际上,我有读(即枚举)排队的网络请求。

问题:如果一个线程在另一个线程添加时枚举队列,我将有错误。 (我可以在读取之前使用 tolist(),但是对于大型队列会吸收所有服务器的内存,因为这可以通过多个请求每秒进行多次完成)。我该如何处理这种情况? 我使用了一个链接列表,但是我可以灵活地使用任何结构。

(另外,这似乎是一个很重的锁定部分;是否有更性能的方法?)

更新
如下所述:我希望队列从几百到几千个项目中有几个项目,但是项目本身很小(只有几种原始数据类型)。我希望每秒都有一个。从Web请求读取的频率较低,假设每分钟几次(但可以同时发生到服务器编写到队列)。

I need a queue with these capabilities:

  • fixed-size (i.e. circular buffer)
  • queue items have ids (like a primary key), which are sequential
  • thread-safe (used from multiple ASP.NET Core requests)

To avoid locking, I tried a ConcurrentQueue but found race conditions. So I'm trying a custom approach.

public interface IQueueItem
{
    long Id { get; set; }
}

public class CircularBuffer<T> : LinkedList<T> where T : class, IQueueItem
{
    public CircularBuffer(int capacity) => _capacity = capacity;
    private readonly int _capacity;

    private long _counter = 0;
    private readonly object _lock = new();

    public void Enqueue(T item)
    {
        lock (_lock) {         // works but feels "heavy"
            _counter++;
            item.Id = _counter;
            if (Count == _capacity) RemoveFirst();
            AddLast(item);
        }
    }
}

And to test:

public class Item : IQueueItem
{
    public long Id { get; set; }
    //...
}

public class Program
{
    public static void Main()
    {
        var q = new CircularBuffer<Item>(10);
        Parallel.For(0, 15, i => q.Enqueue(new Item()));
        Console.WriteLine(string.Join(", ", q.Select(x => x.Id)));
    }
}

Which gives correct output (is ordered even though enqueued by competing threads, and has fixed size with oldest items dequeued):

6, 7, 8, 9, 10, 11, 12, 13, 14, 15

In reality, I have web requests that read (i.e. enumerate) that queue.

The problem: if one thread is enumerating the queue while another thread is adding to it, I will have errors. (I could use a ToList() before the read, but for a large queue that will suck up all the server's memory as this could be done many times a second by multiple requests). How can I deal with that scenario? I used a linked list, but I'm flexible to use any structure.

(Also, that seems to be a really heavy lock section; is there a more performant way?)

UPDATE
As asked in comments below: I expect the queue to have from a few hundred to a few tens of thousand items, but the items themselves are small (just a few primitive data types). I expect an enqueue every second. Reads from web requests are less often, let's say a few times per minute (but can occur concurrently to the server writing to the queue).

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

述情 2025-02-18 11:33:04

根据您在问题中提供的指标,您有很多选择。预期的 cindularBuffer&lt; t&gt; 并不是很重。包装 lock -pretected 队列&lt; t&gt; 应该很好地工作。将队列内容复制成每个枚举的阵列的成本(每分钟几次复制10,000个元素)是不可能的。现代机器可以眨眼间做这些事情。您必须列举每秒数百次的收藏,才能开始(稍微)成为一个问题。

在我的原始答案中(修订版3 )我建议使用 immutablequeue&lt; t&gt; 作为基础存储。经过仔细检查,我注意到这堂课并不是完全付费的。首次枚举它时,它调用内部向后转换 property(锁定 - 保护 queue&lt; t&gt; 更糟糕的解决方案。关于CPU时间和分配,A/72809621/11178549“> lonix的答案。

以下是类似想法的低级实现,它利用了这样一个事实,即我们只需要 Immutablequeue&lt; t&gt; 类的功能的一个子集。这些项目存储在单一链接的列表结构中,可以列举无需额外费用:

public class ConcurrentCircularBuffer<T> : IEnumerable<T>
{
    private readonly object _locker = new();
    private readonly int _capacity;
    private Node _head;
    private Node _tail;
    private int _count = 0;

    private class Node
    {
        public readonly T Item;
        public Node Next;
        public Node(T item) => Item = item;
    }

    public ConcurrentCircularBuffer(int capacity)
    {
        if (capacity < 1) throw new ArgumentOutOfRangeException(nameof(capacity));
        _capacity = capacity;
    }

    public int Count => Volatile.Read(ref _count);

    public void Enqueue(T item)
    {
        Node node = new(item);
        lock (_locker)
        {
            if (_head is null) _head = node;
            if (_tail is not null) _tail.Next = node;
            _tail = node;
            if (_count < _capacity) _count++; else _head = _head.Next;
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        Node node; int count;
        lock (_locker) { node = _head; count = _count; }
        for (int i = 0; i < count && node is not null; i++, node = node.Next)
            yield return node.Item;
    }

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

这种方法的主要优势比 lock pretected queue&lt; t&gt; is它最小化了争论。握住锁时完成的工作很小。

可以在类的替代实现。 72802942/5“>第五修订此答案。

Based on the metrics that you provided in the question, you have plenty of options. The anticipated usage of the CircularBuffer<T> is not really that heavy. Wrapping a lock-protected Queue<T> should work pretty well. The cost of copying the contents of the queue into an array on each enumeration (copying 10,000 elements a few times per minute) is unlikely to be noticeable. Modern machines can do such things in the blink of an eye. You'd have to enumerate the collection hundreds of times per second for this to start (slightly) becoming an issue.

In my original answer (revision 3) I had proposed using an ImmutableQueue<T> as underlying storage. After closer inspection I noticed that this class is not exactly pay-for-play. The first time it is enumerated it invokes the internal BackwardsReversed property (source code), which is quite costly. My performance tests confirmed that it's a worse solution than the simple lock-protected Queue<T> that is shown in lonix's answer, regarding both CPU-time and allocations.

Below is a lower-level implementation of a similar idea, that exploits the fact that we need only a subset of the functionality of the ImmutableQueue<T> class. The items are stored in a singly linked-list structure, that can be enumerated without additional costs:

public class ConcurrentCircularBuffer<T> : IEnumerable<T>
{
    private readonly object _locker = new();
    private readonly int _capacity;
    private Node _head;
    private Node _tail;
    private int _count = 0;

    private class Node
    {
        public readonly T Item;
        public Node Next;
        public Node(T item) => Item = item;
    }

    public ConcurrentCircularBuffer(int capacity)
    {
        if (capacity < 1) throw new ArgumentOutOfRangeException(nameof(capacity));
        _capacity = capacity;
    }

    public int Count => Volatile.Read(ref _count);

    public void Enqueue(T item)
    {
        Node node = new(item);
        lock (_locker)
        {
            if (_head is null) _head = node;
            if (_tail is not null) _tail.Next = node;
            _tail = node;
            if (_count < _capacity) _count++; else _head = _head.Next;
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        Node node; int count;
        lock (_locker) { node = _head; count = _count; }
        for (int i = 0; i < count && node is not null; i++, node = node.Next)
            yield return node.Item;
    }

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

The main advantage of this approach over the lock-protected Queue<T> is that it minimizes the contention. The work done while holding the lock is minuscule.

An alternative implementation of the ConcurrentCircularBuffer<T> class, based on two array buffers and having different pros and cons, can be found in the 5th revision of this answer.

‘画卷フ 2025-02-18 11:33:04

这是使用锁定的 queue&lt; t&gt; 的另一个实现。

public interface IQueueItem
{
    long Id { get; set; }
}

public class CircularBuffer<T> : IEnumerable<T> where T : class, IQueueItem
{
    private readonly int _capacity;
    private readonly Queue<T> _queue;
    private long _lastId = 0;
    private readonly object _lock = new();

    public CircularBuffer(int capacity) {
        _capacity = capacity;
        _queue = new Queue<T>(capacity);
    }

    public void Enqueue(T item)
    {
        lock (_lock) {
            if (_capacity < _queue.Count)
                _queue.Dequeue();
            item.Id = ++_lastId;
            _queue.Enqueue(item);
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (_lock) {
            var copy = _queue.ToArray();
            return ((IEnumerable<T>)copy).GetEnumerator();
        }
    }

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

}

和测试:

public class Item : IQueueItem
{
    private long _id;

    public long Id
    {
        get => Volatile.Read(ref _id);
        set => Volatile.Write(ref _id, value);
    }
}

public class Program
{
    public static void Main()
    {
        var q = new CircularBuffer<Item>(10);
        Parallel.For(0, 15, i => q.Enqueue(new Item()));
        Console.WriteLine(string.Join(", ", q.Select(x => x.Id)));
    }
}

结果:

6、7、8、9、10、11、12、13、14、15

Here is another implementation, using a Queue<T> with locking.

public interface IQueueItem
{
    long Id { get; set; }
}

public class CircularBuffer<T> : IEnumerable<T> where T : class, IQueueItem
{
    private readonly int _capacity;
    private readonly Queue<T> _queue;
    private long _lastId = 0;
    private readonly object _lock = new();

    public CircularBuffer(int capacity) {
        _capacity = capacity;
        _queue = new Queue<T>(capacity);
    }

    public void Enqueue(T item)
    {
        lock (_lock) {
            if (_capacity < _queue.Count)
                _queue.Dequeue();
            item.Id = ++_lastId;
            _queue.Enqueue(item);
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (_lock) {
            var copy = _queue.ToArray();
            return ((IEnumerable<T>)copy).GetEnumerator();
        }
    }

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

}

And to test:

public class Item : IQueueItem
{
    private long _id;

    public long Id
    {
        get => Volatile.Read(ref _id);
        set => Volatile.Write(ref _id, value);
    }
}

public class Program
{
    public static void Main()
    {
        var q = new CircularBuffer<Item>(10);
        Parallel.For(0, 15, i => q.Enqueue(new Item()));
        Console.WriteLine(string.Join(", ", q.Select(x => x.Id)));
    }
}

Result:

6, 7, 8, 9, 10, 11, 12, 13, 14, 15

诠释孤独 2025-02-18 11:33:04

由于此问题中有同时性的情况,因此您可以尝试固定的数组。

IQueueItem[] items = new IQueueItem[SIZE];
long id = 0;

入口很简单。

void Enqueue(IQueueItem item)
{
    long id2 = Interlocked.Increment(ref id);
    item.Id = id2 - 1;
    items[id2 % SIZE] = item;
}

要输出数据,您只需要将数组复制到新数组,然后对其进行排序。 (当然,可以在这里进行优化)

var arr = new IQueueItem[SIZE];
Array.Copy(items, arr, SIZE);
return arr.Where(a => a != null).OrderBy(a => a.Id);

由于并发插入,该数组中可能存在一些差距,您可以采取序列直到找到差距。

var e = arr.Where(a => a != null).OrderBy(a => a.Id);
var firstId = e.First().Id;
return e.TakeWhile((a, index) => a.Id - index == firstId);

Since ConcurrentQueue is out in this question, you can try fixed array.

IQueueItem[] items = new IQueueItem[SIZE];
long id = 0;

Enqueue is simple.

void Enqueue(IQueueItem item)
{
    long id2 = Interlocked.Increment(ref id);
    item.Id = id2 - 1;
    items[id2 % SIZE] = item;
}

To output the data, you just need copy the array to a new one, then sort it. (of course, it can be optimized here)

var arr = new IQueueItem[SIZE];
Array.Copy(items, arr, SIZE);
return arr.Where(a => a != null).OrderBy(a => a.Id);

There may be some gaps in the array because of the concurrent insertions, you can take a sequence till a gap is found.

var e = arr.Where(a => a != null).OrderBy(a => a.Id);
var firstId = e.First().Id;
return e.TakeWhile((a, index) => a.Id - index == firstId);
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文