boost条件变量问题

发布于 2024-11-19 06:21:22 字数 8199 浏览 3 评论 0原文

以下较大程序的最小代码示例将命令从客户端线程发送到 asio io_service 对象。 io_service 对象(在 Ios 类中)正在使用一个线程运行。发送命令后,客户端线程将等待,直到 Ios 对象(通过 Cmd::NotifyFinish())通知它已完成。

该示例似乎可以在带有 boost 1.46 的 Linux Ubuntu 11.04 上运行,但在 Windows 7 boost 1.46 上则运行正常。

我怀疑这与 Cmd::NotifyFinish() 中的锁定有关。当我将锁移出嵌套作用域时,以便在锁的作用域中调用 waitConditionVariable_.notify_one() 时,它不会在 Windows 7 上崩溃。但是,boost::thread 文档指出 notify_one() 不需要在锁内调用。

堆栈跟踪(如下)显示它在调用 notification_one() 时断言。就好像cmd对象在调用notify之前就消失了......

我如何使这个线程安全而不是断言?

#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/bind.hpp>
#include <iostream>

class Cmd
{
public:
    Cmd() :   cnt_(0), waitPred_(false), waiting_(false)
    {
    }
    virtual ~Cmd()
    {
    }
    void BindInfo(int CmdSeq)
    {
        cnt_ = CmdSeq;
    }
    void NotifyFinish()
    {
        // call by service thread...
        {
            boost::mutex::scoped_lock lock(waitMutex_);
            waitPred_ = true;
            if (!waiting_)
            {
                // don't need to notify as isn't waiting
                return;
            }
        }
        waitConditionVariable_.notify_one();
    }
    void Wait()
    {
        // called by worker threads
        boost::mutex::scoped_lock lock(waitMutex_);
        waiting_ = true;
        while (!waitPred_)
            waitConditionVariable_.wait(lock);
    }
    int cnt_;
private:

    boost::mutex waitMutex_;
    boost::condition_variable waitConditionVariable_;
    bool waitPred_;
    bool waiting_;
};


class Ios
{
public:
    Ios() : timer_(ios_), cnt_(0), thread_(boost::bind(&Ios::Start, this))
    {
    }
    void Start()
    {
        timer_.expires_from_now(boost::posix_time::seconds(5));
        timer_.async_wait(boost::bind(&Ios::TimerHandler, this, _1));
        ios_.run();
    }
    void RunCmd(Cmd& C)
    {
        ios_.post(boost::bind(&Ios::RunCmdAsyn, this, boost::ref(C)));
    }

private:
    void RunCmdAsyn(Cmd& C)
    {
        C.BindInfo(cnt_++);
        C.NotifyFinish();
    }
    void TimerHandler(const boost::system::error_code& Ec)
    {
        if (!Ec)
        {
            std::cout << cnt_ << "\n";
            timer_.expires_from_now(boost::posix_time::seconds(5));
            timer_.async_wait(boost::bind(&Ios::TimerHandler, this, _1));
        }
        else
            exit(0);
    }

    boost::asio::io_service ios_;
    boost::asio::deadline_timer timer_;
    int cnt_;
    boost::thread thread_;
};

static Ios ios;

void ThreadFn()
{
    while (1)
    {
        Cmd c;
        ios.RunCmd(c);
        c.Wait();
        //std::cout << c.cnt_ << "\n";
    }
}

int main()
{
    std::cout << "Starting\n";
    boost::thread_group threads;
    const int num = 5;

    for (int i = 0; i < num; i++)
    {
        // Worker threads
        threads.create_thread(ThreadFn);
    }
    threads.join_all();

}

堆栈跟踪

msvcp100d.dll!std::_Debug_message(const wchar_t * message, const wchar_t * file, unsigned int line)  Line 15    C++
iosthread.exe!std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > >::_Compat(const std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > & _Right)  Line 238 + 0x17 bytes   C++
iosthread.exe!std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > >::operator==(const std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > & _Right)  Line 203 C++
iosthread.exe!std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > >::operator!=(const std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > & _Right)  Line 208 + 0xc bytes C++
iosthread.exe!std::_Debug_range2<std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > >(std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _First, std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _Last, const wchar_t * _File, unsigned int _Line, std::random_access_iterator_tag __formal)  Line 715 + 0xc bytes  C++
iosthread.exe!std::_Debug_range<std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > >(std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _First, std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _Last, const wchar_t * _File, unsigned int _Line)  Line 728 + 0x6c bytes    C++
iosthread.exe!std::find_if<std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > >,bool (__cdecl*)(boost::intrusive_ptr<boost::detail::basic_cv_list_entry> const &)>(std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _First, std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _Last, bool (const boost::intrusive_ptr<boost::detail::basic_cv_list_entry> &)* _Pred)  Line 92 + 0x54 bytes    C++
iosthread.exe!std::remove_if<std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > >,bool (__cdecl*)(boost::intrusive_ptr<boost::detail::basic_cv_list_entry> const &)>(std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _First, std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _Last, bool (const boost::intrusive_ptr<boost::detail::basic_cv_list_entry> &)* _Pred)  Line 1848 + 0x58 bytes    C++
iosthread.exe!boost::detail::basic_condition_variable::notify_one()  Line 267 + 0xb4 bytes  C++
iosthread.exe!Cmd::NotifyFinish()  Line 41  C++

The following minimal code sample of a larger program sends commands from client threads to an asio io_service object. The io_service object (in the Ios class) is being run with one thread. When the command is sent the client thread waits until it is notified by the Ios object (via Cmd::NotifyFinish()) that it is completed.

This sample seems to run on Linux Ubuntu 11.04 with boost 1.46 fine but on Windows 7 boost 1.46 it asserts.

I suspect it is something to do with the lock in Cmd::NotifyFinish(). When I move the lock out of the nested scope so that when waitConditionVariable_.notify_one() is called in the lock's scope it doesn't crash on Windows 7. However, the boost::thread documentation states that notify_one() doesn't need to be called within the lock.

The stack trace (below) shows it is asserting when notify_one() is called. It is as though the cmd object has disappeared before notify is called...

How do I make this thread safe and not assert?

#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/bind.hpp>
#include <iostream>

class Cmd
{
public:
    Cmd() :   cnt_(0), waitPred_(false), waiting_(false)
    {
    }
    virtual ~Cmd()
    {
    }
    void BindInfo(int CmdSeq)
    {
        cnt_ = CmdSeq;
    }
    void NotifyFinish()
    {
        // call by service thread...
        {
            boost::mutex::scoped_lock lock(waitMutex_);
            waitPred_ = true;
            if (!waiting_)
            {
                // don't need to notify as isn't waiting
                return;
            }
        }
        waitConditionVariable_.notify_one();
    }
    void Wait()
    {
        // called by worker threads
        boost::mutex::scoped_lock lock(waitMutex_);
        waiting_ = true;
        while (!waitPred_)
            waitConditionVariable_.wait(lock);
    }
    int cnt_;
private:

    boost::mutex waitMutex_;
    boost::condition_variable waitConditionVariable_;
    bool waitPred_;
    bool waiting_;
};


class Ios
{
public:
    Ios() : timer_(ios_), cnt_(0), thread_(boost::bind(&Ios::Start, this))
    {
    }
    void Start()
    {
        timer_.expires_from_now(boost::posix_time::seconds(5));
        timer_.async_wait(boost::bind(&Ios::TimerHandler, this, _1));
        ios_.run();
    }
    void RunCmd(Cmd& C)
    {
        ios_.post(boost::bind(&Ios::RunCmdAsyn, this, boost::ref(C)));
    }

private:
    void RunCmdAsyn(Cmd& C)
    {
        C.BindInfo(cnt_++);
        C.NotifyFinish();
    }
    void TimerHandler(const boost::system::error_code& Ec)
    {
        if (!Ec)
        {
            std::cout << cnt_ << "\n";
            timer_.expires_from_now(boost::posix_time::seconds(5));
            timer_.async_wait(boost::bind(&Ios::TimerHandler, this, _1));
        }
        else
            exit(0);
    }

    boost::asio::io_service ios_;
    boost::asio::deadline_timer timer_;
    int cnt_;
    boost::thread thread_;
};

static Ios ios;

void ThreadFn()
{
    while (1)
    {
        Cmd c;
        ios.RunCmd(c);
        c.Wait();
        //std::cout << c.cnt_ << "\n";
    }
}

int main()
{
    std::cout << "Starting\n";
    boost::thread_group threads;
    const int num = 5;

    for (int i = 0; i < num; i++)
    {
        // Worker threads
        threads.create_thread(ThreadFn);
    }
    threads.join_all();

}

stack trace

msvcp100d.dll!std::_Debug_message(const wchar_t * message, const wchar_t * file, unsigned int line)  Line 15    C++
iosthread.exe!std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > >::_Compat(const std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > & _Right)  Line 238 + 0x17 bytes   C++
iosthread.exe!std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > >::operator==(const std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > & _Right)  Line 203 C++
iosthread.exe!std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > >::operator!=(const std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > & _Right)  Line 208 + 0xc bytes C++
iosthread.exe!std::_Debug_range2<std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > >(std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _First, std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _Last, const wchar_t * _File, unsigned int _Line, std::random_access_iterator_tag __formal)  Line 715 + 0xc bytes  C++
iosthread.exe!std::_Debug_range<std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > >(std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _First, std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _Last, const wchar_t * _File, unsigned int _Line)  Line 728 + 0x6c bytes    C++
iosthread.exe!std::find_if<std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > >,bool (__cdecl*)(boost::intrusive_ptr<boost::detail::basic_cv_list_entry> const &)>(std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _First, std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _Last, bool (const boost::intrusive_ptr<boost::detail::basic_cv_list_entry> &)* _Pred)  Line 92 + 0x54 bytes    C++
iosthread.exe!std::remove_if<std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > >,bool (__cdecl*)(boost::intrusive_ptr<boost::detail::basic_cv_list_entry> const &)>(std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _First, std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _Last, bool (const boost::intrusive_ptr<boost::detail::basic_cv_list_entry> &)* _Pred)  Line 1848 + 0x58 bytes    C++
iosthread.exe!boost::detail::basic_condition_variable::notify_one()  Line 267 + 0xb4 bytes  C++
iosthread.exe!Cmd::NotifyFinish()  Line 41  C++

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

如痴如狂 2024-11-26 06:21:22

问题在于条件变量是由客户端线程创建的 Cmd 对象的成员,并在等待完成时被该客户端线程销毁。

因此,您有一个竞争条件,其中:

  • 在“服务线程”上调用 boost::condition_variable::notify_one() ,该服务线程
  • 解除对正在等待该条件变量的客户端线程的阻塞,
  • 然后客户端线程可以销毁该条件变量服务线程在调用 notify_one 时仍在使用的条件变量。

因此,我认为您观察到的“就好像 cmd 对象在调用通知之前就消失了”几乎正是所发生的情况。除了在调用 notify_one() 之前 Cmd 对象没有消失之外,它在 notify_one() 执行工作时消失了。您的另一条注释是“boost::thread 文档指出 notify_one() 不需要在锁内调用”是正确的,但这并不意味着条件变量可以在 notify_one() 返回之前被销毁。

您需要管理 Cmd 对象的生命周期,以便服务线程在它被销毁之前使用它 - 在 时保存 Cmd 对象中的互斥体调用notify_one() 是实现此目的的一种方法(正如您所注意到的)。或者,您可以将条件变量从 Cmd 对象中取出,以便其生命周期独立于 Cmd 对象(也许 shared_ptr 可以帮忙)。

另外,请注意,我认为 Cmd 类的 waiting_ 成员是多余的 - 您可以调用 notify_one()notify_all( ) 当条件变量上没有服务员时 - 它已经为您进行了检查(我不认为这会伤害任何东西,只是它的复杂性不需要包含在Cmd 类)。

The problem is that the condition variable is a member of the Cmd object that is created by the client thread and is destroyed by that client thread when the wait is completed.

So you have a race condition where:

  • boost::condition_variable::notify_one() is called on the 'service thread'
  • that unblocks the client thread that's waiting on that condition variable
  • the client thread can then destroy the condition variable that the service thread is still working with in its call to notify_one.

So your observation that it's "as though the cmd object has disappeared before notify is called" is pretty much exactly what's happened, I think. Except that the Cmd object didn't disappear before notify_one() is called, it disappeared while notify_one() was doing its work. Your other note that "the boost::thread documentation states that notify_one() doesn't need to be called within the lock" is true, but that doesn't mean that the condition variable can be destroyed before notify_one() has returned.

You need to manage the lifetime of the Cmd object so that the service thread is done using it before it gets destroyed - holding the mutex that's in the Cmd object while notify_one() is called is one way to do that (as you've noticed). Or you can pull the condition variable out of the Cmd object so that its lifetime is independent of the Cmd object (maybe shared_ptr<> can help with that).

Also, note that I believe that the waiting_ member of the Cmd class is superfluous - you can call notify_one() or notify_all() when there are no waiters on a condition variable - it's already doing the checking for that for you (I don't think it's hurting anything, just that it's complexity that doesn't need to be in the Cmd class).

死开点丶别碍眼 2024-11-26 06:21:22
void ThreadFn()
{
    while (1)
    {
        Cmd c;
        ios.RunCmd(c);
         c.Wait();
        //std::cout << c.cnt_ << "\n";
    }
}

既然这个循环是无限的,为什么不直接输入 Cmd c;在 while(1) 的范围之外,因此它在 while(1) 的每次迭代中重用“c”?

void ThreadFn()
{
    Cmd c;

    while (1)
    {   
        ios.RunCmd(c);
        c.Wait();
        //std::cout << c.cnt_ << "\n";
    }
}
void ThreadFn()
{
    while (1)
    {
        Cmd c;
        ios.RunCmd(c);
         c.Wait();
        //std::cout << c.cnt_ << "\n";
    }
}

Since this loop is infinite why not just put Cmd c; outside the scope of the while(1) so it reuses 'c' each iteration of while(1) ?

void ThreadFn()
{
    Cmd c;

    while (1)
    {   
        ios.RunCmd(c);
        c.Wait();
        //std::cout << c.cnt_ << "\n";
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文