boost::asio 多线程异步接受阻塞读/写服务器

发布于 2024-11-09 20:38:31 字数 4299 浏览 5 评论 0原文

我的想法是创建 X 线程,使用 KeepRunning 方法运行它,该方法有无限循环调用 _io_service.run() ,并在 async_accept 处理程序中使用 _io_service.poll() 收到新连接时将任务发送到 _io_service 。

我使用如下代码运行服务器:

    oh::msg::OHServer s("0.0.0.0", "9999", 200);
    ConsoleStopServer = boost::bind(&oh::msg::OHServer::Stop, &s);
    SetConsoleCtrlHandler(bConsoleHandler, TRUE);
    s.Run();

但是当我收到一个连接时,然后使用 MsgWorker 类中的阻塞读/写在 Post() 方法中提供该连接,然后所有线程都将被关闭。

我有如下代码(它是 http server3 asio 示例和我的代码的混合):

OHServer::OHServer(const std::string& sAddress, const std::string& sPort, std::size_t tps)
: _nThreadPoolSize(tps), _acceptor(_io_service), _sockClient(new boost::asio::ip::tcp::socket(_io_service))
{
    // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
    boost::asio::ip::tcp::resolver resolver(_io_service);
    boost::asio::ip::tcp::resolver::query query(sAddress, sPort);
    boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
    _acceptor.open(endpoint.protocol());
    _acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
    _acceptor.bind(endpoint);
    _acceptor.listen();

    _acceptor.async_accept(
            *_sockClient,
            boost::bind(
                    &OHServer::AcceptConnection,
                    this,
                    boost::asio::placeholders::error
            )
    );
}


void OHServer::KeepRunning()
{
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
            << "] Thread Start" << std::endl;
    global_stream_lock.unlock();

    while( true )
    {
            try
            {
                    boost::system::error_code ec;
                    _io_service.run( ec );
                    if( ec )
                    {
                            global_stream_lock.lock();
                            std::cout << "[" << boost::this_thread::get_id()
                                    << "] Error: " << ec << std::endl;
                            global_stream_lock.unlock();
                    }
                    break;
            }
            catch( std::exception & ex )
            {
                    global_stream_lock.lock();
                    std::cout << "[" << boost::this_thread::get_id()
                            << "] Exception: " << ex.what() << std::endl;
                    global_stream_lock.unlock();
            }
    }

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
            << "] Thread Finish" << std::endl;
    global_stream_lock.unlock();
}

void OHServer::Run()
{
    // Create a pool of threads to run all of the io_services.

    for (std::size_t i = 0; i < _nThreadPoolSize; ++i)
    {
        boost::shared_ptr<boost::thread> thread(new boost::thread(
                boost::bind(&OHServer::KeepRunning, this)));
        threads.push_back(thread);
    }

    cout << "Hit enter to close server" << endl;
    cin.get();


}

void OHServer::Stop()
{
    boost::system::error_code ec;
    _acceptor.close(ec);

    _sockClient->shutdown( boost::asio::ip::tcp::socket::shutdown_both, ec );
    _sockClient->close( ec );

    _io_service.stop();

    // Wait for all threads in the pool to exit.
    for (std::size_t i = 0; i < threads.size(); ++i)
    {
        threads[i]->join();
        cout << "threads[ "<< i << "]->join();" << endl;
    }
}

void OHServer::Post()
{
    std::cout << "Accepted new connection." << std::endl;
    CMsgWorker *msgWorker = new CMsgWorker(_sockClient);
    msgWorker->Start();
    delete msgWorker;
}

void OHServer::AcceptConnection(const boost::system::error_code& e)
{
    if (!e)
    {

        _io_service.post(boost::bind(&OHServer::Post, this));

        _acceptor.async_accept(
                *_sockClient,
                boost::bind(
                        &OHServer::AcceptConnection,
                        this,
                        boost::asio::placeholders::error
                )
        );

    }
}

我应该怎么做才能让线程仍在等待 _io_service 做一些工作?

感谢您的帮助!

My idea was to create X threads, run it using KeepRunning method which has endless loop calling _io_service.run() and send tasks to _io_service when received a new connection using _io_service.poll() in async_accept handler.

I run the server with a code like this:

    oh::msg::OHServer s("0.0.0.0", "9999", 200);
    ConsoleStopServer = boost::bind(&oh::msg::OHServer::Stop, &s);
    SetConsoleCtrlHandler(bConsoleHandler, TRUE);
    s.Run();

but when I receive one connection, then serve it in Post() method using blocking read/writes in MsgWorker class, then all the threads are being closed.

I have code like below (it's some mix from http server3 asio example and mine):

OHServer::OHServer(const std::string& sAddress, const std::string& sPort, std::size_t tps)
: _nThreadPoolSize(tps), _acceptor(_io_service), _sockClient(new boost::asio::ip::tcp::socket(_io_service))
{
    // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
    boost::asio::ip::tcp::resolver resolver(_io_service);
    boost::asio::ip::tcp::resolver::query query(sAddress, sPort);
    boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
    _acceptor.open(endpoint.protocol());
    _acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
    _acceptor.bind(endpoint);
    _acceptor.listen();

    _acceptor.async_accept(
            *_sockClient,
            boost::bind(
                    &OHServer::AcceptConnection,
                    this,
                    boost::asio::placeholders::error
            )
    );
}


void OHServer::KeepRunning()
{
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
            << "] Thread Start" << std::endl;
    global_stream_lock.unlock();

    while( true )
    {
            try
            {
                    boost::system::error_code ec;
                    _io_service.run( ec );
                    if( ec )
                    {
                            global_stream_lock.lock();
                            std::cout << "[" << boost::this_thread::get_id()
                                    << "] Error: " << ec << std::endl;
                            global_stream_lock.unlock();
                    }
                    break;
            }
            catch( std::exception & ex )
            {
                    global_stream_lock.lock();
                    std::cout << "[" << boost::this_thread::get_id()
                            << "] Exception: " << ex.what() << std::endl;
                    global_stream_lock.unlock();
            }
    }

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
            << "] Thread Finish" << std::endl;
    global_stream_lock.unlock();
}

void OHServer::Run()
{
    // Create a pool of threads to run all of the io_services.

    for (std::size_t i = 0; i < _nThreadPoolSize; ++i)
    {
        boost::shared_ptr<boost::thread> thread(new boost::thread(
                boost::bind(&OHServer::KeepRunning, this)));
        threads.push_back(thread);
    }

    cout << "Hit enter to close server" << endl;
    cin.get();


}

void OHServer::Stop()
{
    boost::system::error_code ec;
    _acceptor.close(ec);

    _sockClient->shutdown( boost::asio::ip::tcp::socket::shutdown_both, ec );
    _sockClient->close( ec );

    _io_service.stop();

    // Wait for all threads in the pool to exit.
    for (std::size_t i = 0; i < threads.size(); ++i)
    {
        threads[i]->join();
        cout << "threads[ "<< i << "]->join();" << endl;
    }
}

void OHServer::Post()
{
    std::cout << "Accepted new connection." << std::endl;
    CMsgWorker *msgWorker = new CMsgWorker(_sockClient);
    msgWorker->Start();
    delete msgWorker;
}

void OHServer::AcceptConnection(const boost::system::error_code& e)
{
    if (!e)
    {

        _io_service.post(boost::bind(&OHServer::Post, this));

        _acceptor.async_accept(
                *_sockClient,
                boost::bind(
                        &OHServer::AcceptConnection,
                        this,
                        boost::asio::placeholders::error
                )
        );

    }
}

What should I do for the threads to be still waiting for some work to do from _io_service?

Thanks for any help!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

百变从容 2024-11-16 20:38:31

检查一下:

  // Kick off 5 threads
  for (size_t i = 0; i < 5; ++i) {
    boost::thread* t = threads.create_thread(boost::bind(&boost::asio::io_service::run, &io));
    std::cout << "Creating thread " << i << " with id " << t->get_id() << std::endl;
  }

请参阅此处的 timer.cc 示例,了解如何执行此操作的想法:https://github.com/sean-/Boost.Examples/tree/master/asio/timer

Check it out:

  // Kick off 5 threads
  for (size_t i = 0; i < 5; ++i) {
    boost::thread* t = threads.create_thread(boost::bind(&boost::asio::io_service::run, &io));
    std::cout << "Creating thread " << i << " with id " << t->get_id() << std::endl;
  }

See the timer.cc example here for an idea on how to do this: https://github.com/sean-/Boost.Examples/tree/master/asio/timer

甚是思念 2024-11-16 20:38:31

最后,我得到了一些易于使用的服务器版本:

用法:

boost::shared_ptr<CTCPServer> _serverPtr;

void CMyServer::Start()
{      
    //First we must create a few threads
    thread* t = 0;
    for (int i = 0; i < COHConfig::_iThreads; ++i)
    {
        t =_threads.create_thread(bind(&io_service::run, &_io_service));
    }

    //Then we create a server object
    _serverPtr.reset( new CTCPServer(&_io_service, PORT_NUMBER) );    

    //And finally run the server through io_service
    _io_service.post(boost::bind(&CMyServer::RunServer, _serverPtr, &CMyServer::HandleMessage));
}

//This is the function which is called by io_service to start our server
void CMyServer::RunServer(CTCPServer* s, void (*HandleFunction)(shared_ptr<ip::tcp::socket>, deadline_timer*))
{
    s->Run(HandleFunction);
}

//And this is our connection handler
void CMyServer::HandleMessage(shared_ptr< ip::tcp::socket > sockClient, deadline_timer* timer)
{
    cout << "Handling connection from: " << sockClient->remote_endpoint().address().to_string() << ":" << sockClient->remote_endpoint().port() << endl;


    //This is some class which gets socket in its constructor and handles the connection

    scoped_ptr<CMyWorker> myWorker( new CMyWorker(sockClient) );

    msgWorker->Start();

}

//Thanks to this function we can stop our server
void CMyServer::Stop()
{
    _serverPtr->Stop();        
}

TCPServer.hpp 文件:

#ifndef TCPSERVER_HPP
#define TCPSERVER_HPP

#if defined(_WIN32)
    #define BOOST_THREAD_USE_LIB
#endif

#include <boost/asio.hpp>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <string>
#include <vector>

class CTCPServer: private boost::noncopyable
{
private:
    bool bKeepRunning;

    boost::asio::io_service* _io_service;
    std::string _sPort;
    boost::asio::ip::tcp::acceptor _acceptor;
    boost::shared_ptr< boost::asio::ip::tcp::socket > _sockClient;
    boost::asio::deadline_timer _timer;
    bool _bIPv6;

    std::string SessionID();
public:

    CTCPServer(boost::asio::io_service* ios, const std::string& sPort, bool bIPv6=false):
        _sPort(sPort),
        _acceptor(*ios),
        _timer(*ios),
        _bIPv6(bIPv6)
    {
        _io_service = ios;
        bKeepRunning = false;
    };
    void Run(void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket > sock, boost::asio::deadline_timer* timer));
    void AsyncAccept(void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket > , boost::asio::deadline_timer* ));
    void AcceptHandler(const boost::system::error_code& e, void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket >, boost::asio::deadline_timer* ));
    void Stop();
    void Stop(void (*StopFunction)());

};

#endif

TCPServer.cpp 文件:

#include "TCPServer.hpp"
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <iostream>

using namespace std;

string CTCPServer::SessionID()
{
    ostringstream outs;
    outs << "[" << boost::this_thread::get_id() << "] ";
    return outs.str();
}

void CTCPServer::Run(void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket > , boost::asio::deadline_timer* ))
{
    try
    {
        boost::asio::ip::tcp::resolver resolver(*_io_service);
        boost::asio::ip::tcp::endpoint endpoint;
        if(_bIPv6)
        {
            boost::asio::ip::tcp::resolver::query queryv6(boost::asio::ip::tcp::v6(), _sPort);
            endpoint = *resolver.resolve(queryv6);
        }
        else
        {
            boost::asio::ip::tcp::resolver::query queryv4(boost::asio::ip::tcp::v4(), _sPort);
            endpoint = *resolver.resolve(queryv4);
        }
        _acceptor.open(endpoint.protocol());
        _acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
        _acceptor.set_option(boost::asio::socket_base::enable_connection_aborted(true));
        _acceptor.bind(endpoint);
        _acceptor.listen();
        boost::system::error_code ec;
        bKeepRunning = true;
        AsyncAccept(HandleFunction);
    }
    catch(std::exception& e)
    {
        if(!_bIPv6)
            std::cerr << "Exception wile creating IPv4 TCP socket on port "<< _sPort<< ": " << e.what() << std::endl;
        else
            std::cerr << "Exception wile creating IPv6 TCP socket on port "<< _sPort<< ": " << e.what() << std::endl;
    }
}

void CTCPServer::AsyncAccept(void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket > , boost::asio::deadline_timer* ))
{
    if(bKeepRunning)
    {
        try
        {
            _sockClient.reset(new boost::asio::ip::tcp::socket(*_io_service));
            cout << SessionID() << "Waiting for connection on port: " << _sPort << endl;
            _acceptor.async_accept(*_sockClient, boost::bind(&CTCPServer::AcceptHandler, this, boost::asio::placeholders::error, HandleFunction));
        }
        catch(exception& e)
        {
            string sWhat = e.what();
            cout << SessionID() << "Error while accepting connection: " << e.what() << endl;
        }
    }
}

void CTCPServer::AcceptHandler(const boost::system::error_code& e,
                            void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket >,
                                                   boost::asio::deadline_timer* ))
{
    if(!e)
    {
        try
        {
            (*_io_service).post(boost::bind(HandleFunction, _sockClient, &_timer));
            AsyncAccept(HandleFunction);
        }
        catch(exception& e)
        {
            cout << SessionID() << "Exception: " << e.what() << endl;
        }
    }
}

void CTCPServer::Stop()
{
    cout << SessionID() << "STOP port " << _sPort << endl;

    if(!bKeepRunning)
        return;

    bKeepRunning = false;

    try
    {
        _sockClient->close();
    }
    catch(exception& e)
    {
        cout << SessionID() << "Exception: " << e.what() << endl;
    }

    try
    {
        _acceptor.cancel();
    }
    catch(exception& e)
    {
        cout << SessionID() << "Exception: " << e.what() << endl;
    }

    try
    {
        _acceptor.close();
    }
    catch(exception& e)
    {
        cout << SessionID() << "Exception: " << e.what() << endl;
    }
}

void CTCPServer::Stop(void (*StopFunction)())
{
    Stop();
    StopFunction();
}

修改为与 IPv6 兼容也很容易。
它已经经过测试并且运行良好。只需复制并使用即可!

Finally I've ended up with some easy-to-use version of server:

Usage:

boost::shared_ptr<CTCPServer> _serverPtr;

void CMyServer::Start()
{      
    //First we must create a few threads
    thread* t = 0;
    for (int i = 0; i < COHConfig::_iThreads; ++i)
    {
        t =_threads.create_thread(bind(&io_service::run, &_io_service));
    }

    //Then we create a server object
    _serverPtr.reset( new CTCPServer(&_io_service, PORT_NUMBER) );    

    //And finally run the server through io_service
    _io_service.post(boost::bind(&CMyServer::RunServer, _serverPtr, &CMyServer::HandleMessage));
}

//This is the function which is called by io_service to start our server
void CMyServer::RunServer(CTCPServer* s, void (*HandleFunction)(shared_ptr<ip::tcp::socket>, deadline_timer*))
{
    s->Run(HandleFunction);
}

//And this is our connection handler
void CMyServer::HandleMessage(shared_ptr< ip::tcp::socket > sockClient, deadline_timer* timer)
{
    cout << "Handling connection from: " << sockClient->remote_endpoint().address().to_string() << ":" << sockClient->remote_endpoint().port() << endl;


    //This is some class which gets socket in its constructor and handles the connection

    scoped_ptr<CMyWorker> myWorker( new CMyWorker(sockClient) );

    msgWorker->Start();

}

//Thanks to this function we can stop our server
void CMyServer::Stop()
{
    _serverPtr->Stop();        
}

The TCPServer.hpp file:

#ifndef TCPSERVER_HPP
#define TCPSERVER_HPP

#if defined(_WIN32)
    #define BOOST_THREAD_USE_LIB
#endif

#include <boost/asio.hpp>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <string>
#include <vector>

class CTCPServer: private boost::noncopyable
{
private:
    bool bKeepRunning;

    boost::asio::io_service* _io_service;
    std::string _sPort;
    boost::asio::ip::tcp::acceptor _acceptor;
    boost::shared_ptr< boost::asio::ip::tcp::socket > _sockClient;
    boost::asio::deadline_timer _timer;
    bool _bIPv6;

    std::string SessionID();
public:

    CTCPServer(boost::asio::io_service* ios, const std::string& sPort, bool bIPv6=false):
        _sPort(sPort),
        _acceptor(*ios),
        _timer(*ios),
        _bIPv6(bIPv6)
    {
        _io_service = ios;
        bKeepRunning = false;
    };
    void Run(void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket > sock, boost::asio::deadline_timer* timer));
    void AsyncAccept(void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket > , boost::asio::deadline_timer* ));
    void AcceptHandler(const boost::system::error_code& e, void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket >, boost::asio::deadline_timer* ));
    void Stop();
    void Stop(void (*StopFunction)());

};

#endif

The TCPServer.cpp file:

#include "TCPServer.hpp"
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <iostream>

using namespace std;

string CTCPServer::SessionID()
{
    ostringstream outs;
    outs << "[" << boost::this_thread::get_id() << "] ";
    return outs.str();
}

void CTCPServer::Run(void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket > , boost::asio::deadline_timer* ))
{
    try
    {
        boost::asio::ip::tcp::resolver resolver(*_io_service);
        boost::asio::ip::tcp::endpoint endpoint;
        if(_bIPv6)
        {
            boost::asio::ip::tcp::resolver::query queryv6(boost::asio::ip::tcp::v6(), _sPort);
            endpoint = *resolver.resolve(queryv6);
        }
        else
        {
            boost::asio::ip::tcp::resolver::query queryv4(boost::asio::ip::tcp::v4(), _sPort);
            endpoint = *resolver.resolve(queryv4);
        }
        _acceptor.open(endpoint.protocol());
        _acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
        _acceptor.set_option(boost::asio::socket_base::enable_connection_aborted(true));
        _acceptor.bind(endpoint);
        _acceptor.listen();
        boost::system::error_code ec;
        bKeepRunning = true;
        AsyncAccept(HandleFunction);
    }
    catch(std::exception& e)
    {
        if(!_bIPv6)
            std::cerr << "Exception wile creating IPv4 TCP socket on port "<< _sPort<< ": " << e.what() << std::endl;
        else
            std::cerr << "Exception wile creating IPv6 TCP socket on port "<< _sPort<< ": " << e.what() << std::endl;
    }
}

void CTCPServer::AsyncAccept(void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket > , boost::asio::deadline_timer* ))
{
    if(bKeepRunning)
    {
        try
        {
            _sockClient.reset(new boost::asio::ip::tcp::socket(*_io_service));
            cout << SessionID() << "Waiting for connection on port: " << _sPort << endl;
            _acceptor.async_accept(*_sockClient, boost::bind(&CTCPServer::AcceptHandler, this, boost::asio::placeholders::error, HandleFunction));
        }
        catch(exception& e)
        {
            string sWhat = e.what();
            cout << SessionID() << "Error while accepting connection: " << e.what() << endl;
        }
    }
}

void CTCPServer::AcceptHandler(const boost::system::error_code& e,
                            void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket >,
                                                   boost::asio::deadline_timer* ))
{
    if(!e)
    {
        try
        {
            (*_io_service).post(boost::bind(HandleFunction, _sockClient, &_timer));
            AsyncAccept(HandleFunction);
        }
        catch(exception& e)
        {
            cout << SessionID() << "Exception: " << e.what() << endl;
        }
    }
}

void CTCPServer::Stop()
{
    cout << SessionID() << "STOP port " << _sPort << endl;

    if(!bKeepRunning)
        return;

    bKeepRunning = false;

    try
    {
        _sockClient->close();
    }
    catch(exception& e)
    {
        cout << SessionID() << "Exception: " << e.what() << endl;
    }

    try
    {
        _acceptor.cancel();
    }
    catch(exception& e)
    {
        cout << SessionID() << "Exception: " << e.what() << endl;
    }

    try
    {
        _acceptor.close();
    }
    catch(exception& e)
    {
        cout << SessionID() << "Exception: " << e.what() << endl;
    }
}

void CTCPServer::Stop(void (*StopFunction)())
{
    Stop();
    StopFunction();
}

It's also very easy to modify to be IPv6 compatible.
It's already tested and working very well. Just copy it and use!

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文