有界队列场景

发布于 2024-08-20 05:34:14 字数 556 浏览 11 评论 0原文

我需要实现一个生产者/消费者有界队列,多个消费者针对一个生产者。

我有一个推送函数,可以将一个项目添加到队列中,然后检查 maxsize。如果我们达到了它,则返回 false,在其他情况下返回 true。

在下面的代码中_vector是一个List,onSignal基本上以异步方式消费一个项目。

您发现这段代码有问题吗?

public bool Push(T message)
{
    bool canEnqueue = true;

    lock (_vector)
    {
        _vector.Add(message);
        if (_vector.Count >= _maxSize)
        {
            canEnqueue = false;
        }
    }

    var onSignal = SignalEvent;
    if (onSignal != null)
    {
        onSignal();
    }

    return canEnqueue;
}

I need to implement a producer/consumer bounded queue, multiple consumers against a single producer.

I have a push function that adds an item to the queue and then checks for maxsize. If we have reached it return false, in every other case return true.

In the following code _vector is a List<T>, onSignal basically consumes an item in an asynchronous way.

Do you see issues with this code?

public bool Push(T message)
{
    bool canEnqueue = true;

    lock (_vector)
    {
        _vector.Add(message);
        if (_vector.Count >= _maxSize)
        {
            canEnqueue = false;
        }
    }

    var onSignal = SignalEvent;
    if (onSignal != null)
    {
        onSignal();
    }

    return canEnqueue;
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

冰火雁神 2024-08-27 05:34:14

我看到的最大问题是使用 List 来实现队列;这样做会存在性能问题,因为删除第一项涉及复制所有数据。

额外的想法;即使您没有添加数据,您也会发出信号,并且事件的使用本身可能会出现线程问题(存在一些边缘情况,即使您在 null 测试之前捕获值 - 而且它可能比使用Monitor进行信号传输的开销更大。

我会切换到不会出现此问题的 Queue - 或者更好地使用预先滚动的示例;例如 在 .NET 中创建阻塞队列?,这正是您所要做的讨论并支持任意数量的生产者和消费者。它使用阻塞方法,但“尝试”方法是:

public bool TryEnqueue(T item)
{
    lock (queue)
    {
        if (queue.Count >= maxSize) { return false; }
        queue.Enqueue(item);
        if (queue.Count == 1)
        {
            // wake up any blocked dequeue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}

最后 - 你不“推送”到堆栈,而不是队列吗?

The biggest problem I see there is the use of List<T> to implement a queue; there are performance issues doing this, as removing the first item involves copying all the data.

Additional thoughts; you're raising the signal even if you didn't add data, and the use of events itself may have issue with threading (there are some edge cases, even when you capture the value before the null test - plus it is possibly more overhead than using the Monitor to do the signalling).

I would switch to a Queue<T> which won't have this problem - or better use a pre-rolled example; for example Creating a blocking Queue in .NET?, which does exactly what you discuss, and supports any number of both producers and consumers. It uses the blocking approach, but a "try" approach would be:

public bool TryEnqueue(T item)
{
    lock (queue)
    {
        if (queue.Count >= maxSize) { return false; }
        queue.Enqueue(item);
        if (queue.Count == 1)
        {
            // wake up any blocked dequeue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}

Finally - don't you "push" to a stack, not a queue?

榆西 2024-08-27 05:34:14

我知道你说的是单生产者、多消费者,但无论如何值得一提:如果你的队列几乎已满(比如 25 个插槽中的 24 个),那么如果两个线程同时 Push ,你最终会超出限制。如果将来有可能有多个生产者,您应该考虑将 Push 设为阻塞调用,并让它等待“可用”AutoResetEvent在项目出队后或在项目入队但仍有可用槽位后发出信号。

我看到的唯一其他潜在问题是 SignalEvent。您没有向我们展示其实施情况。如果将其声明为公共事件 SignalEventDelegate SignalEvent,那么就可以了,因为编译器会自动添加一个 SynchronizedAttribute。但是,如果 SignalEvent 使用具有 add/remove 语法的后备委托,那么您将需要为事件本身提供自己的锁定,否则消费者可能会太晚脱离事件,但之后仍然会收到一些信号。

编辑:实际上,无论如何这是可能的;更重要的是,如果您在没有适当锁定的情况下使用了属性样式的添加/删除委托,那么当您尝试执行该委托时,该委托实际上可能处于无效状态。即使使用同步事件,消费者也需要准备好在取消订阅后接收(并丢弃)通知。

除此之外,我没有看到任何问题 - 尽管这并不意味着没有任何问题,但这只是意味着我没有注意到任何问题。

I know you said single-producer, multiple-consumer, but it's worth mentioning anyway: if your queue is almost full (say 24 out of 25 slots), then if two threads Push at the same time, you will end up exceeding the limit. If there's even a chance you might have multiple producers at some point in the future, you should consider making Push a blocking call, and have it wait for an "available" AutoResetEvent which is signaled after either an item is dequeued or after an item is enqueued while there are still slots available.

The only other potential issue I see is the SignalEvent. You don't show us the implementation of that. If it's declared as public event SignalEventDelegate SignalEvent, then you will be OK because the compiler automatically adds a SynchronizedAttribute. However, if SignalEvent uses a backing delegate with add/remove syntax, then you will need to provide your own locking for the event itself, otherwise it will be possible for a consumer to detach from the event just a little too late and still receive a couple of signals afterward.

Edit: Actually, that is possible regardless; more importantly, if you've used a property-style add/remove delegate without the appropriate locking, it is actually possible for the delegate to be in an invalid state when you try to execute it. Even with a synchronized event, consumers need to be prepared to receive (and discard) notifications after they've unsubscribed.

Other than that I see no issues - although that doesn't mean that there aren't any, it just means I haven't noticed any.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文