C# 一旦主线程睡眠,所有线程都停止

发布于 2024-08-22 03:17:40 字数 3105 浏览 4 评论 0原文

我有一个运行生产者-消费者模型的类,如下所示:

public class SyncEvents
{
    public bool waiting;

    public SyncEvents()
    {
        waiting = true;
    }
}

public class Producer
{
    private readonly Queue<Delegate> _queue;
    private SyncEvents _sync;
    private Object _waitAck;

    public Producer(Queue<Delegate> q, SyncEvents sync, Object obj)
    {
        _queue = q;
        _sync = sync;
        _waitAck = obj;
    }

    public void ThreadRun()
    {
        lock (_sync)
        {
            while (true)
            {
                Monitor.Wait(_sync, 0);
                if (_queue.Count > 0)
                {
                    _sync.waiting = false;
                }
                else
                {
                    _sync.waiting = true;
                    lock (_waitAck)
                    {
                        Monitor.Pulse(_waitAck);
                    }
                }
                Monitor.Pulse(_sync);
            }
        }
    }

}

public class Consumer
{
    private readonly Queue<Delegate> _queue;
    private SyncEvents _sync;

    private int count = 0;

    public Consumer(Queue<Delegate> q, SyncEvents sync)
    {
        _queue = q;
        _sync = sync;
    }

    public void ThreadRun()
    {
        lock (_sync)
        {
            while (true)
            {
                while (_queue.Count == 0)
                {
                    Monitor.Wait(_sync);
                }

                Delegate query = _queue.Dequeue();
                query.DynamicInvoke(null);

                count++;

                Monitor.Pulse(_sync);
            }
        }
    }
}

/// <summary>
/// Act as a consumer to the queries produced by the DataGridViewCustomCell
/// </summary>
public class QueryThread
{
    private SyncEvents _syncEvents = new SyncEvents();
    private Object waitAck = new Object();
    private Queue<Delegate> _queryQueue = new Queue<Delegate>();

    Producer queryProducer;
    Consumer queryConsumer;

    public QueryThread()
    {
        queryProducer = new Producer(_queryQueue, _syncEvents, waitAck);
        queryConsumer = new Consumer(_queryQueue, _syncEvents);

        Thread producerThread = new Thread(queryProducer.ThreadRun);
        Thread consumerThread = new Thread(queryConsumer.ThreadRun);

        producerThread.IsBackground = true;
        consumerThread.IsBackground = true;

        producerThread.Start();
        consumerThread.Start();
    }

    public bool isQueueEmpty()
    {
        return _syncEvents.waiting;
    }

    public void wait()
    {
        lock (waitAck)
        {
            while (_queryQueue.Count > 0)
            {
                Monitor.Wait(waitAck);
            }
        }
    }

    public void Enqueue(Delegate item)
    {
        _queryQueue.Enqueue(item);
    }
}

代码运行顺利,但 wait() 函数。 在某些情况下,我想等到队列中的所有函数都完成运行,所以我创建了 wait() 函数。

生产者将在适当的时间触发 waitAck 脉冲。

但是,当“Monitor.Wait(waitAck);”行时在 wait() 函数中运行时,所有线程都会停止,包括生产者线程和消费者线程。

为什么会发生这种情况以及如何解决?谢谢!

I have a class running the Producer-Consumer model like this:

public class SyncEvents
{
    public bool waiting;

    public SyncEvents()
    {
        waiting = true;
    }
}

public class Producer
{
    private readonly Queue<Delegate> _queue;
    private SyncEvents _sync;
    private Object _waitAck;

    public Producer(Queue<Delegate> q, SyncEvents sync, Object obj)
    {
        _queue = q;
        _sync = sync;
        _waitAck = obj;
    }

    public void ThreadRun()
    {
        lock (_sync)
        {
            while (true)
            {
                Monitor.Wait(_sync, 0);
                if (_queue.Count > 0)
                {
                    _sync.waiting = false;
                }
                else
                {
                    _sync.waiting = true;
                    lock (_waitAck)
                    {
                        Monitor.Pulse(_waitAck);
                    }
                }
                Monitor.Pulse(_sync);
            }
        }
    }

}

public class Consumer
{
    private readonly Queue<Delegate> _queue;
    private SyncEvents _sync;

    private int count = 0;

    public Consumer(Queue<Delegate> q, SyncEvents sync)
    {
        _queue = q;
        _sync = sync;
    }

    public void ThreadRun()
    {
        lock (_sync)
        {
            while (true)
            {
                while (_queue.Count == 0)
                {
                    Monitor.Wait(_sync);
                }

                Delegate query = _queue.Dequeue();
                query.DynamicInvoke(null);

                count++;

                Monitor.Pulse(_sync);
            }
        }
    }
}

/// <summary>
/// Act as a consumer to the queries produced by the DataGridViewCustomCell
/// </summary>
public class QueryThread
{
    private SyncEvents _syncEvents = new SyncEvents();
    private Object waitAck = new Object();
    private Queue<Delegate> _queryQueue = new Queue<Delegate>();

    Producer queryProducer;
    Consumer queryConsumer;

    public QueryThread()
    {
        queryProducer = new Producer(_queryQueue, _syncEvents, waitAck);
        queryConsumer = new Consumer(_queryQueue, _syncEvents);

        Thread producerThread = new Thread(queryProducer.ThreadRun);
        Thread consumerThread = new Thread(queryConsumer.ThreadRun);

        producerThread.IsBackground = true;
        consumerThread.IsBackground = true;

        producerThread.Start();
        consumerThread.Start();
    }

    public bool isQueueEmpty()
    {
        return _syncEvents.waiting;
    }

    public void wait()
    {
        lock (waitAck)
        {
            while (_queryQueue.Count > 0)
            {
                Monitor.Wait(waitAck);
            }
        }
    }

    public void Enqueue(Delegate item)
    {
        _queryQueue.Enqueue(item);
    }
}

The code run smoothly but the wait() function.
In some case I want to wait until all the function in the queue were finished running so I made the wait() function.

The producer will fire the waitAck pulse at suitable time.

However, when the line "Monitor.Wait(waitAck);" is ran in the wait() function, all thread stop, includeing the producer and consumer thread.

Why would this happen and how can I solve it? thanks!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

自找没趣 2024-08-29 03:17:40

所有线程实际上不太可能停止,尽管我应该指出,为了避免错误唤醒,您可能应该有一个 while 循环而不是 if 语句:

lock (waitAck)
{
    while(queryProducer.secondQueue.Count > 0)
    {
        Monitor.Wait(waitAck);
    }
}

您正在调用 Monitor.Wait 的事实 意味着应该释放 waitAck,因此它不应该阻止消费者线程锁定...

您能否提供有关生产者/消费者线程“停止”方式的更多信息?看起来他们刚刚陷入了僵局吗?

您的制作人使用的是 Notify 还是 NotifyAll?你现在有一个额外的等待线程,所以如果你只使用 Notify 它只会释放一个线程......如果没有你的 Notify 的详细信息,很难看出这是否是一个问题。 code>ProducerConsumer 类。

如果您可以展示一个简短但完整的程序来演示该问题,那将会有所帮助。

编辑:好的,现在您已经发布了代码,我可以看到许多问题:

  • 拥有如此多的公共变量会导致灾难。您的类应该封装它们的功能,以便其他代码不必四处寻找实现的细节。 (例如,这里的调用代码确实不应该访问队列。)

  • 您将项目直接添加到第二个队列,这意味着您无法有效地唤醒生产者将它们添加到第一个队列队列。为什么你甚至有多个队列?

  • 你总是在生产者线程中等待_sync...为什么?首先要通知它什么?一般来说,生产者线程不应该等待,除非你有一个有界缓冲区

  • 您有一个静态变量(_waitAck),每次创建新实例时该变量都会被覆盖。这是一个坏主意。

您还没有显示您的 SyncEvents 类 - 这是否意味着要做一些有趣的事情?

老实说,你的设计似乎很奇怪 - 你最好从头开始。尝试将整个生产者/消费者队列封装在一个类中,该类具有 ProduceConsume 方法,以及 WaitForEmpty (或类似的方法)那)。我想你会发现这样同步逻辑容易多了。

It seems very unlikely that all the threads will actually stop, although I should point out that to avoid false wake-ups you should probably have a while loop instead of an if statement:

lock (waitAck)
{
    while(queryProducer.secondQueue.Count > 0)
    {
        Monitor.Wait(waitAck);
    }
}

The fact that you're calling Monitor.Wait means that waitAck should be released so it shouldn't prevent the consumer threads from locking...

Could you give more information about the way in which the producer/consumer threads are "stopping"? Does it look like they've just deadlocked?

Is your producer using Notify or NotifyAll? You've got an extra waiting thread now, so if you only use Notify it's only going to release a single thread... it's hard to see whether or not that's a problem without the details of your Producer and Consumer classes.

If you could show a short but complete program to demonstrate the problem, that would help.

EDIT: Okay, now you've posted the code I can see a number of issues:

  • Having so many public variables is a recipe for disaster. Your classes should encapsulate their functionality so that other code doesn't have to go poking around for implementation bits and pieces. (For example, your calling code here really shouldn't have access to the queue.)

  • You're adding items directly to the second queue, which means you can't efficiently wake up the producer to add them to the first queue. Why do you even have multiple queues?

  • You're always waiting on _sync in the producer thread... why? What's going to notify it to start with? Generally speaking the producer thread shouldn't have to wait, unless you have a bounded buffer

  • You have a static variable (_waitAck) which is being overwritten every time you create a new instance. That's a bad idea.

You also haven't shown your SyncEvents class - is that meant to be doing anything interesting?

To be honest, it seems like you've got quite a strange design - you may well be best starting again from scratch. Try to encapsulate the whole producer/consumer queue in a single class, which has Produce and Consume methods, as well as WaitForEmpty (or something like that). I think you'll find the synchronization logic a lot easier that way.

我做我的改变 2024-08-29 03:17:40

这是我对您的代码的看法:

public class ProducerConsumer
{
    private ManualResetEvent _ready;
    private Queue<Delegate> _queue; 
    private Thread _consumerService;
    private static Object _sync = new Object();

    public ProducerConsumer(Queue<Delegate> queue)
    {
        lock (_sync)
        {
            // Note: I would recommend that you don't even
            // bother with taking in a queue.  You should be able
            // to just instantiate a new Queue<Delegate>()
            // and use it when you Enqueue.  There is nothing that
            // you really need to pass into the constructor.
            _queue = queue;
            _ready = new ManualResetEvent(false);
            _consumerService = new Thread(Run);
            _consumerService.IsBackground = true;
            _consumerService.Start();
        }
    }

    public override void Enqueue(Delegate value)
    {
        lock (_sync)
        {
            _queue.Enqueue(value);
            _ready.Set();
        }
    }

    // The consumer blocks until the producer puts something in the queue.
    private void Run()
    {
        Delegate query;
        try
        {
            while (true)
            {
                _ready.WaitOne();
                lock (_sync)
                {
                    if (_queue.Count > 0)
                    {
                        query = _queue.Dequeue();
                        query.DynamicInvoke(null);
                    }
                    else
                    {
                        _ready.Reset();
                        continue;
                    }
                }
            }
        }
        catch (ThreadInterruptedException)
        {
            _queue.Clear();
            return;
        }
    }


    protected override void Dispose(bool disposing)
    {
        lock (_sync)
        {
            if (_consumerService != null)
            {
                _consumerService.Interrupt();
            }
        }
        base.Dispose(disposing);
    }


}

我不太确定您想要通过等待功能实现什么...我假设您正在尝试对可以的项目数量进行某种类型的限制排队。在这种情况下,当队列中有太多项目时,只需抛出异常或返回失败信号,调用 Enqueue 的客户端将不断重试,直到队列可以容纳更多项目。采取乐观的态度会为你省去很多麻烦,而且它只会帮助你摆脱很多复杂的逻辑。

如果您真的想在那里等待,那么我也许可以帮助您找到更好的方法。让我知道您在等待过程中想要实现什么目标,我会帮助您。

注意:我从我的一个项目中获取了这段代码,对其进行了一些修改并将其发布在这里...可能存在一些轻微的语法错误,但逻辑应该是正确的。

更新:根据您的评论,我做了一些修改:我向该类添加了另一个 ManualResetEvent ,因此当您调用 BlockQueue() 时,它会为您提供您可以等待该事件并设置一个标志以阻止 Enqueue 函数对更多元素进行排队。一旦队列中的所有查询都得到服务,该标志就会设置为 true,并设置 _wait 事件,以便等待该事件的任何人都会收到信号。

public class ProducerConsumer
{
    private bool _canEnqueue;
    private ManualResetEvent _ready;
    private Queue<Delegate> _queue; 
    private Thread _consumerService;

    private static Object _sync = new Object();
    private static ManualResetEvent _wait = new ManualResetEvent(false);

    public ProducerConsumer()
    {
        lock (_sync)
        {
            _queue = new Queue<Delegate> _queue;
            _canEnqueue = true;
            _ready = new ManualResetEvent(false);
            _consumerService = new Thread(Run);
            _consumerService.IsBackground = true;
            _consumerService.Start();
        }
    }

    public bool Enqueue(Delegate value)
    {
        lock (_sync)
        {
            // Don't allow anybody to enqueue
            if( _canEnqueue )
            {
                _queue.Enqueue(value);
                _ready.Set();
                return true;
            }
        }
        // Whoever is calling Enqueue should try again later.
        return false;
    }

    // The consumer blocks until the producer puts something in the queue.
    private void Run()
    {
        try
        {
            while (true)
            {
                // Wait for a query to be enqueued
                _ready.WaitOne();

                // Process the query
                lock (_sync)
                {
                    if (_queue.Count > 0)
                    {
                        Delegate query = _queue.Dequeue();
                        query.DynamicInvoke(null);
                    }
                    else
                    {
                        _canEnqueue = true;
                        _ready.Reset();
                        _wait.Set();
                        continue;
                    }
                }
            }
        }
        catch (ThreadInterruptedException)
        {
            _queue.Clear();
            return;
        }
    }

    // Block your queue from enqueuing, return null
    // if the queue is already empty.
    public ManualResetEvent BlockQueue()
    {
        lock(_sync)
        {
            if( _queue.Count > 0 )
            {
                _canEnqueue = false;
                _wait.Reset();
            }
            else
            {
                // You need to tell the caller that they can't
                // block your queue while it's empty. The caller
                // should check if the result is null before calling
                // WaitOne().
                return null;
            }
        }
        return _wait;
    }

    protected override void Dispose(bool disposing)
    {
        lock (_sync)
        {
            if (_consumerService != null)
            {
                _consumerService.Interrupt();
                // Set wait when you're disposing the queue
                // so that nobody is left with a lingering wait.
                _wait.Set();
            }
        }
        base.Dispose(disposing);
    }
}

Here is my take on your code:

public class ProducerConsumer
{
    private ManualResetEvent _ready;
    private Queue<Delegate> _queue; 
    private Thread _consumerService;
    private static Object _sync = new Object();

    public ProducerConsumer(Queue<Delegate> queue)
    {
        lock (_sync)
        {
            // Note: I would recommend that you don't even
            // bother with taking in a queue.  You should be able
            // to just instantiate a new Queue<Delegate>()
            // and use it when you Enqueue.  There is nothing that
            // you really need to pass into the constructor.
            _queue = queue;
            _ready = new ManualResetEvent(false);
            _consumerService = new Thread(Run);
            _consumerService.IsBackground = true;
            _consumerService.Start();
        }
    }

    public override void Enqueue(Delegate value)
    {
        lock (_sync)
        {
            _queue.Enqueue(value);
            _ready.Set();
        }
    }

    // The consumer blocks until the producer puts something in the queue.
    private void Run()
    {
        Delegate query;
        try
        {
            while (true)
            {
                _ready.WaitOne();
                lock (_sync)
                {
                    if (_queue.Count > 0)
                    {
                        query = _queue.Dequeue();
                        query.DynamicInvoke(null);
                    }
                    else
                    {
                        _ready.Reset();
                        continue;
                    }
                }
            }
        }
        catch (ThreadInterruptedException)
        {
            _queue.Clear();
            return;
        }
    }


    protected override void Dispose(bool disposing)
    {
        lock (_sync)
        {
            if (_consumerService != null)
            {
                _consumerService.Interrupt();
            }
        }
        base.Dispose(disposing);
    }


}

I'm not exactly sure what you're trying to achieve with the wait function... I'm assuming you're trying to put some type of a limit to the number of items that can be queued. In that case simply throw an exception or return a failure signal when you have too many items in the queue, the client that is calling Enqueue will keep retrying until the queue can take more items. Taking an optimistic approach will save you a LOT of headaches and it simply helps you get rid of a lot of complex logic.

If you REALLY want to have the wait in there, then I can probably help you figure out a better approach. Let me know what are you trying to achieve with the wait and I'll help you out.

Note: I took this code from one of my projects, modified it a little and posted it here... there might be some minor syntax errors, but the logic should be correct.

UPDATE: Based on your comments I made some modifications: I added another ManualResetEvent to the class, so when you call BlockQueue() it gives you an event which you can wait on and sets a flag to stop the Enqueue function from queuing more elements. Once all the queries in the queue are serviced, the flag is set to true and the _wait event is set so whoever is waiting on it gets the signal.

public class ProducerConsumer
{
    private bool _canEnqueue;
    private ManualResetEvent _ready;
    private Queue<Delegate> _queue; 
    private Thread _consumerService;

    private static Object _sync = new Object();
    private static ManualResetEvent _wait = new ManualResetEvent(false);

    public ProducerConsumer()
    {
        lock (_sync)
        {
            _queue = new Queue<Delegate> _queue;
            _canEnqueue = true;
            _ready = new ManualResetEvent(false);
            _consumerService = new Thread(Run);
            _consumerService.IsBackground = true;
            _consumerService.Start();
        }
    }

    public bool Enqueue(Delegate value)
    {
        lock (_sync)
        {
            // Don't allow anybody to enqueue
            if( _canEnqueue )
            {
                _queue.Enqueue(value);
                _ready.Set();
                return true;
            }
        }
        // Whoever is calling Enqueue should try again later.
        return false;
    }

    // The consumer blocks until the producer puts something in the queue.
    private void Run()
    {
        try
        {
            while (true)
            {
                // Wait for a query to be enqueued
                _ready.WaitOne();

                // Process the query
                lock (_sync)
                {
                    if (_queue.Count > 0)
                    {
                        Delegate query = _queue.Dequeue();
                        query.DynamicInvoke(null);
                    }
                    else
                    {
                        _canEnqueue = true;
                        _ready.Reset();
                        _wait.Set();
                        continue;
                    }
                }
            }
        }
        catch (ThreadInterruptedException)
        {
            _queue.Clear();
            return;
        }
    }

    // Block your queue from enqueuing, return null
    // if the queue is already empty.
    public ManualResetEvent BlockQueue()
    {
        lock(_sync)
        {
            if( _queue.Count > 0 )
            {
                _canEnqueue = false;
                _wait.Reset();
            }
            else
            {
                // You need to tell the caller that they can't
                // block your queue while it's empty. The caller
                // should check if the result is null before calling
                // WaitOne().
                return null;
            }
        }
        return _wait;
    }

    protected override void Dispose(bool disposing)
    {
        lock (_sync)
        {
            if (_consumerService != null)
            {
                _consumerService.Interrupt();
                // Set wait when you're disposing the queue
                // so that nobody is left with a lingering wait.
                _wait.Set();
            }
        }
        base.Dispose(disposing);
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文