使用Qt的并发队列死锁

发布于 2024-11-30 09:55:53 字数 2337 浏览 0 评论 0原文

我正在尝试使用 Qt 的并发线程结构创建一个并发队列。

#ifndef CONCURRENTQUEUE_H
#define CONCURRENTQUEUE_H
#include <QMutex>
#include <QWaitCondition>
#include <queue>

template<typename Data>
class ConcurrentQueue
{
private:
    std::queue<Data> the_queue;
    QMutex the_mutex;
    QWaitCondition the_condition_variable;
    bool closed;

public:

    void setClosed(bool state)
    {
        QMutexLocker locker(&the_mutex);
        closed = state;    
    }

    bool getClosed()
    {
        QMutexLocker locker(&the_mutex);
        return closed;    
    }

    void push(Data const& data)
    {
        QMutexLocker locker(&the_mutex);
        the_queue.push(data);
        the_condition_variable.wakeOne();    
    }

    bool empty()
    {
        QMutexLocker locker(&the_mutex);
        return the_queue.empty();    
    }

    bool try_pop(Data& popped_value)
    {
        QMutexLocker locker(&the_mutex);
        if(the_queue.empty())
        {
            return false;
        }
        popped_value = the_queue.front();
        the_queue.pop();
        return true;
    }

    void wait_and_pop(Data& popped_value)
    {
        QMutexLocker locker(&the_mutex);
        while(the_queue.empty())
        {
            the_condition_variable.wait(&the_mutex);
        }
        popped_value = the_queue.front();
        the_queue.pop();
        the_condition_variable.wakeOne();
    }

    //created to allow for a limited queue size
    void wait_and_push(Data const& data, const int max_size)
    {
        QMutexLocker locker(&the_mutex);
        while(the_queue.size() >= max_size)
        {
            the_condition_variable.wait(&the_mutex);
        }
        the_queue.push(data);
        the_condition_variable.wakeOne();
    }


};

#endif // CONCURRENTQUEUE_H

我的生产者线程使用 wait_and_push 方法将数据推送到队列中,并且我一直在尝试让我的消费者使用 try_pop 从队列中读取数据,

 while(!tiles->empty() || !tiles->getClosed())
{
             if(!tiles->try_pop(tile))
                    continue;
//do stuff with the tile
}

但是,有时会出现死锁。生产者将关闭的布尔值设置为消费者线程的标志,表明已完成加载队列。我的消费者仅以此作为了解队列是否正在加载、仍在进行或尚未启动的一种方式。

生产者使用“wait_and_push”而不是使用正常推送的原因是因为我希望能够使该线程阻塞,直到某些项目被处理为止,以避免消耗太多内存并进行不必要的磁盘 I/O。

谁能指出我出了什么问题吗?

I am trying to create a concurrent queue with Qt's concurrent threading constructs.

#ifndef CONCURRENTQUEUE_H
#define CONCURRENTQUEUE_H
#include <QMutex>
#include <QWaitCondition>
#include <queue>

template<typename Data>
class ConcurrentQueue
{
private:
    std::queue<Data> the_queue;
    QMutex the_mutex;
    QWaitCondition the_condition_variable;
    bool closed;

public:

    void setClosed(bool state)
    {
        QMutexLocker locker(&the_mutex);
        closed = state;    
    }

    bool getClosed()
    {
        QMutexLocker locker(&the_mutex);
        return closed;    
    }

    void push(Data const& data)
    {
        QMutexLocker locker(&the_mutex);
        the_queue.push(data);
        the_condition_variable.wakeOne();    
    }

    bool empty()
    {
        QMutexLocker locker(&the_mutex);
        return the_queue.empty();    
    }

    bool try_pop(Data& popped_value)
    {
        QMutexLocker locker(&the_mutex);
        if(the_queue.empty())
        {
            return false;
        }
        popped_value = the_queue.front();
        the_queue.pop();
        return true;
    }

    void wait_and_pop(Data& popped_value)
    {
        QMutexLocker locker(&the_mutex);
        while(the_queue.empty())
        {
            the_condition_variable.wait(&the_mutex);
        }
        popped_value = the_queue.front();
        the_queue.pop();
        the_condition_variable.wakeOne();
    }

    //created to allow for a limited queue size
    void wait_and_push(Data const& data, const int max_size)
    {
        QMutexLocker locker(&the_mutex);
        while(the_queue.size() >= max_size)
        {
            the_condition_variable.wait(&the_mutex);
        }
        the_queue.push(data);
        the_condition_variable.wakeOne();
    }


};

#endif // CONCURRENTQUEUE_H

I have my producer thread using the wait_and_push method to push data into the queue, and I have been trying to get my consumer to read from the queue using try_pop

 while(!tiles->empty() || !tiles->getClosed())
{
             if(!tiles->try_pop(tile))
                    continue;
//do stuff with the tile
}

However, this deadlocks sometimes. The producer sets the closed boolean as a flag to the consumer threads that it is finished loading the queue. My consumer only has that as a way to know whether teh queue is being loaded, still in progress, or hasnt been started.

The reason the producer has a "wait_and_push" isntead of using the normal push is because I wanted to be able to make that thread block until some items had been processed to avoid eating up so much memory, and doing unnecessary disk I/O.

Can anyone point me to what is going wrong?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

仄言 2024-12-07 09:55:53

您忘记添加

the_condition_variable.wakeOne();

try_pop

如果有多个生产者/消费者访问您的队列,您应该为生产者和消费者设置单独的 QWaitCondition,否则wakeOne 可能无法唤醒正确的线程。

编辑:

如果有多个生产者/消费者,那么您应该有一个 notFullCondvar 和一个 notEmptyCondvar

  • try_pop 方法唤醒 notFullCondvar
  • wait_and_pop 方法等待 notEmptyCondvar,但唤醒 notFullCondvar
  • push 方法唤醒 notEmptyCondvar
  • wait_and_push 方法等待 notFullCondvar,但唤醒 notEmptyCondvar

我希望这是有道理的。

You forgot to add

the_condition_variable.wakeOne();

in try_pop.

If there will be multiple producers / consumers accessing your queue, you should have a separate QWaitCondition for producers and consumers, otherwise wakeOne might not wake the correct thread.

EDIT:

If there will be multiple producers / consumers, then you should have a notFullCondvar, and an notEmptyCondvar.

  • The try_pop method wakes the notFullCondvar.
  • The wait_and_pop method waits on the notEmptyCondvar, but wakes the notFullCondvar.
  • The push method wakes the notEmptyCondvar.
  • The wait_and_push method waits on the notFullCondvar, but wakes the notEmptyCondvar.

I hope this makes sense.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文