boost 互斥体抛出(奇怪?)异常

发布于 2024-12-12 07:00:18 字数 6460 浏览 0 评论 0原文

我正在使用从该网站获得的阻塞队列示例,认为它非常好。 该阻塞队列使用 boost::mutex。 有时会抛出异常:

terminate called after throwing an instance of 'boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> >'

what():错误的文件描述符

这是阻塞队列代码:

#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/condition_variable.hpp>
#include <exception>
#include <list>
#include <stdio.h>

struct BlockingQueueTerminate
  : std::exception
{};

namespace tools {
  template<class T>
  class BlockingQueue
  {
  private:
    boost::mutex mtx_;
    boost::condition_variable cnd_;
    std::list<T> q_;
    unsigned blocked_;
    bool stop_;

  public:
    BlockingQueue()
      : blocked_()
      , stop_()
    {}

    ~BlockingQueue()
    {
      this->stop(true);
    }

    void stop(bool wait)
    {
      // tell threads blocked on BlockingQueue::pull() to leave
      boost::mutex::scoped_lock lock(mtx_);
      stop_ = true;
      cnd_.notify_all();

      if(wait) // wait till all threads blocked on the queue leave BlockingQueue::pull()
    while(blocked_)
      cnd_.wait(lock);
    }

    void put(T t)
    {
      boost::mutex::scoped_lock lock(mtx_);  // The exception is thrown here !
      q_.push_back(t);
      cnd_.notify_one();
    }

  T pull()
    {
      boost::mutex::scoped_lock lock(mtx_);
      ++blocked_;
      while(!stop_ && q_.empty())
    cnd_.wait(lock);
      --blocked_;

      if(stop_) {
    cnd_.notify_all(); // tell stop() this thread has left
    throw BlockingQueueTerminate();
      }

      T front = q_.front();
      q_.pop_front();
      return front;
    }
  };
}

任何人都可以发现这里出了什么问题吗?因为我一整天都在尝试弄清楚这一点,但没有成功。我想我需要一个外部的眼睛才能看到它。 查找注释“//此处抛出异常!”看看问题到底出在哪里。

编辑1:

上下文:我正在使用这个阻塞队列来创建MySQL异步包装器。

这是我的 MySQL.hh

#ifndef MYSQL_HH_
# define MYSQL_HH_
# include <boost/asio.hpp>
# include <boost/thread.hpp>
# include <boost/function.hpp>
# include <mysql++/mysql++.h>
# include <queue>
# include "async_executor.hh"
# include "BlockingQueue.hh"

class t_mysql_event {
public:
  t_mysql_event(std::string query, boost::function<void(mysqlpp::StoreQueryResult)> cb) :
    m_query(query), m_store_cb(cb), m_store_bool(true) {}

  t_mysql_event(std::string query, boost::function<void()> cb) :
    m_query(query), m_exec_cb(cb),  m_store_bool(false) {}

  bool is_store_query() {
    return m_store_bool;
  }

  std::string toString() {
    return m_query;
  }

  std::string                       m_query;
  boost::function<void(mysqlpp::StoreQueryResult)>  m_store_cb;
  boost::function<void()>               m_exec_cb;

private:
  bool                          m_store_bool;
};

namespace pools {
  class MySQL {
  public:
    ~MySQL() {}

    static MySQL* create_instance(boost::asio::io_service& io);

    static MySQL* get_instance();

    void exec(std::string query, boost::function<void()> cb);
    void store(std::string query, boost::function<void(mysqlpp::StoreQueryResult)> cb);

  private:
    MySQL(boost::asio::io_service& io) : executor(io, 100), parent_io(io), m_strand(io)
    {
      for (int i=0; i < 100; ++i) {
    boost::thread(boost::bind(&MySQL::retreive, this));
      }
    }

    void async_exec(std::string query, boost::function<void()> cb, mysqlpp::Connection& conn);
    void async_store(std::string query, boost::function<void(mysqlpp::StoreQueryResult)> cb, mysqlpp::Connection& conn);

    void retreive();

  private:
    tools::async_executor           executor;
    boost::asio::io_service&        parent_io;
    boost::asio::strand         m_strand;
    tools::BlockingQueue<t_mysql_event*>    m_events;
    std::queue<mysqlpp::Connection*>    m_stack;
  };
}

#endif //MYSQL_HH_

这是 MySQL.cc :

#include "MySQL.hh"

static pools::MySQL* _instance = 0;

namespace pools {


  MySQL* MySQL::create_instance(boost::asio::io_service& io) {
    if (!_instance)
      _instance = new MySQL(io);
    return _instance;
  }

  MySQL* MySQL::get_instance() {
    if (!_instance) {
      exit(1);
    }
    return _instance;
  }

  void MySQL::exec(std::string query, boost::function<void()> cb) {
    m_events.put(new t_mysql_event(query, cb));
  }

  void MySQL::store(std::string query, boost::function<void(mysqlpp::StoreQueryResult)> cb) {
    m_events.put(new t_mysql_event(query, cb));
  }

  void MySQL::retreive() {
    mysqlpp::Connection conn("***", "***", "***", "***");
    for(;;) {
      t_mysql_event *event = m_events.pull();
      if (event->is_store_query())
    async_store(event->m_query, event->m_store_cb, conn);
      else
    async_exec(event->m_query, event->m_exec_cb, conn);
      delete event;
    }
  }

  void MySQL::async_exec(std::string query, boost::function<void()> cb, mysqlpp::Connection& conn) {
    mysqlpp::Query db_q = conn.query(query.c_str());
    db_q.exec();
    parent_io.post(cb);
  }

  void MySQL::async_store(std::string query, boost::function<void(mysqlpp::StoreQueryResult)> cb, mysqlpp::Connection& conn) {
    mysqlpp::Query db_q = conn.query(query.c_str());
    mysqlpp::StoreQueryResult res = db_q.store();
    parent_io.post(boost::bind(cb, res));
   }
}

之后:

class MyClass {
public:
   MyClass() : _mysql(pools::MySQL::get_instance()) {}

   startQueries();
private:
   void Query1() {
      std::stringstream query("");
      query << "INSERT INTO Table1 ***";
      _mysql->exec(query.str(),
                   boost::bind(&MyClass::Query2, this, _1));
   }
   void Query2() {
      std::stringstream query("");
      query << "INSERT INTO Table2 ***";
      _mysql->exec(query.str(),
                   boost::bind(&MyClass::Query3, this, _1));
   }
   void Query3() {
      std::stringstream query("");
      query << "INSERT INTO Table3 ***";
      _mysql->exec(query.str(),
                   boost::bind(&MyClass::done, this, _1));
   }
   void done() {}
   pools::MySQL *_mysql;
};

希望能够回答一些有关更多信息的请求...

有趣的事情:

如果我用 pools::MySQL::get_instance() 替换每个 _mysql,我不会似乎崩溃了。 但我怀疑下面有一个更重要的错误......

I am using a blocking queue example I got from this website, thinking it was pretty nice.
This blocking queue is using boost::mutex.
It is sometime throwing an exception :

terminate called after throwing an instance of 'boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> >'

what(): Bad file descriptor

Here's the Blocking Queue code :

#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/condition_variable.hpp>
#include <exception>
#include <list>
#include <stdio.h>

struct BlockingQueueTerminate
  : std::exception
{};

namespace tools {
  template<class T>
  class BlockingQueue
  {
  private:
    boost::mutex mtx_;
    boost::condition_variable cnd_;
    std::list<T> q_;
    unsigned blocked_;
    bool stop_;

  public:
    BlockingQueue()
      : blocked_()
      , stop_()
    {}

    ~BlockingQueue()
    {
      this->stop(true);
    }

    void stop(bool wait)
    {
      // tell threads blocked on BlockingQueue::pull() to leave
      boost::mutex::scoped_lock lock(mtx_);
      stop_ = true;
      cnd_.notify_all();

      if(wait) // wait till all threads blocked on the queue leave BlockingQueue::pull()
    while(blocked_)
      cnd_.wait(lock);
    }

    void put(T t)
    {
      boost::mutex::scoped_lock lock(mtx_);  // The exception is thrown here !
      q_.push_back(t);
      cnd_.notify_one();
    }

  T pull()
    {
      boost::mutex::scoped_lock lock(mtx_);
      ++blocked_;
      while(!stop_ && q_.empty())
    cnd_.wait(lock);
      --blocked_;

      if(stop_) {
    cnd_.notify_all(); // tell stop() this thread has left
    throw BlockingQueueTerminate();
      }

      T front = q_.front();
      q_.pop_front();
      return front;
    }
  };
}

Anyone can spot what's going wrong here ? because I have tried the all day figuring it out in vain. I guess I need a outside eye to see it.
Look for the comment '//The exception is thrown here !' to see where exactly the problem occurs.

EDIT 1 :

The context : I'm using this blocking queue in order to create a MySQL async wrapper.

Here's my MySQL.hh

#ifndef MYSQL_HH_
# define MYSQL_HH_
# include <boost/asio.hpp>
# include <boost/thread.hpp>
# include <boost/function.hpp>
# include <mysql++/mysql++.h>
# include <queue>
# include "async_executor.hh"
# include "BlockingQueue.hh"

class t_mysql_event {
public:
  t_mysql_event(std::string query, boost::function<void(mysqlpp::StoreQueryResult)> cb) :
    m_query(query), m_store_cb(cb), m_store_bool(true) {}

  t_mysql_event(std::string query, boost::function<void()> cb) :
    m_query(query), m_exec_cb(cb),  m_store_bool(false) {}

  bool is_store_query() {
    return m_store_bool;
  }

  std::string toString() {
    return m_query;
  }

  std::string                       m_query;
  boost::function<void(mysqlpp::StoreQueryResult)>  m_store_cb;
  boost::function<void()>               m_exec_cb;

private:
  bool                          m_store_bool;
};

namespace pools {
  class MySQL {
  public:
    ~MySQL() {}

    static MySQL* create_instance(boost::asio::io_service& io);

    static MySQL* get_instance();

    void exec(std::string query, boost::function<void()> cb);
    void store(std::string query, boost::function<void(mysqlpp::StoreQueryResult)> cb);

  private:
    MySQL(boost::asio::io_service& io) : executor(io, 100), parent_io(io), m_strand(io)
    {
      for (int i=0; i < 100; ++i) {
    boost::thread(boost::bind(&MySQL::retreive, this));
      }
    }

    void async_exec(std::string query, boost::function<void()> cb, mysqlpp::Connection& conn);
    void async_store(std::string query, boost::function<void(mysqlpp::StoreQueryResult)> cb, mysqlpp::Connection& conn);

    void retreive();

  private:
    tools::async_executor           executor;
    boost::asio::io_service&        parent_io;
    boost::asio::strand         m_strand;
    tools::BlockingQueue<t_mysql_event*>    m_events;
    std::queue<mysqlpp::Connection*>    m_stack;
  };
}

#endif //MYSQL_HH_

Here's the MySQL.cc :

#include "MySQL.hh"

static pools::MySQL* _instance = 0;

namespace pools {


  MySQL* MySQL::create_instance(boost::asio::io_service& io) {
    if (!_instance)
      _instance = new MySQL(io);
    return _instance;
  }

  MySQL* MySQL::get_instance() {
    if (!_instance) {
      exit(1);
    }
    return _instance;
  }

  void MySQL::exec(std::string query, boost::function<void()> cb) {
    m_events.put(new t_mysql_event(query, cb));
  }

  void MySQL::store(std::string query, boost::function<void(mysqlpp::StoreQueryResult)> cb) {
    m_events.put(new t_mysql_event(query, cb));
  }

  void MySQL::retreive() {
    mysqlpp::Connection conn("***", "***", "***", "***");
    for(;;) {
      t_mysql_event *event = m_events.pull();
      if (event->is_store_query())
    async_store(event->m_query, event->m_store_cb, conn);
      else
    async_exec(event->m_query, event->m_exec_cb, conn);
      delete event;
    }
  }

  void MySQL::async_exec(std::string query, boost::function<void()> cb, mysqlpp::Connection& conn) {
    mysqlpp::Query db_q = conn.query(query.c_str());
    db_q.exec();
    parent_io.post(cb);
  }

  void MySQL::async_store(std::string query, boost::function<void(mysqlpp::StoreQueryResult)> cb, mysqlpp::Connection& conn) {
    mysqlpp::Query db_q = conn.query(query.c_str());
    mysqlpp::StoreQueryResult res = db_q.store();
    parent_io.post(boost::bind(cb, res));
   }
}

Afterwards :

class MyClass {
public:
   MyClass() : _mysql(pools::MySQL::get_instance()) {}

   startQueries();
private:
   void Query1() {
      std::stringstream query("");
      query << "INSERT INTO Table1 ***";
      _mysql->exec(query.str(),
                   boost::bind(&MyClass::Query2, this, _1));
   }
   void Query2() {
      std::stringstream query("");
      query << "INSERT INTO Table2 ***";
      _mysql->exec(query.str(),
                   boost::bind(&MyClass::Query3, this, _1));
   }
   void Query3() {
      std::stringstream query("");
      query << "INSERT INTO Table3 ***";
      _mysql->exec(query.str(),
                   boost::bind(&MyClass::done, this, _1));
   }
   void done() {}
   pools::MySQL *_mysql;
};

Hoping that will answer to some request for more informations...

Funny thing :

If I replace every _mysql by pools::MySQL::get_instance() I does not seems to crash.
But I suspect there is an error far more important below that...

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

演出会有结束 2024-12-19 07:00:18

如果队列已被销毁但您尝试调用其 put 方法,则可能会引发此异常。通过在队列析构函数中放置断点(或打印语句)来检查这一点。

this exception can be thrown if queue is already destroyed but you try to call its put method. Check this by putting a breakpoint (or print statement) in queue destructor.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文