c++ 中的异步处理

发布于 2024-12-21 17:20:43 字数 186 浏览 0 评论 0原文

一个将永远运行并处理请求的服务器需要其中的异步代码部分,该部分将执行一些数据库查询并仅在有任何新更改时进行更新。服务器必须永远运行,并且这个用于一次又一次执行数据库函数的函数必须异步运行,这样就不会因为每“x”分钟更新一次而阻碍服务器。

在 C++ 中如何最好地异步处理这个问题?如何单独设置该函数在守护进程上运行,以便它根本不会阻塞服务器?

A Server which will run forever and processes requests needs an asynchronous part of code in it which will execute some database queries and update only when there are any new changes to it. The server has to run forever and this function for executing a db function again and again has to run asynchronously, so that there is not hindrance the server because of update once in every 'x' minutes.

How best this can be processed asynchronously in c++ ? How can I set that function alone to run on a daemon so that it doesn't block the server at all ?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

离去的眼神 2024-12-28 17:20:43

我强烈建议使用 Boost 的 ASIO 库

你需要一个类来接受新请求,另一个类来定期检查用于更新。两者都可以异步完成工作,并使用相同的 boost::asio::io_service 来安排工作。

设置将是

  • 一个网络异步 boost::asio::ip::tcp::acceptor 监听新请求。
  • boost::asio::deadline_time 执行异步等待并检查数据库更新。

据我所知,您所描述的伪代码如下:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <string>

class DatabaseUpdateChecker{
    public:
    DatabaseUpdateChecker(boost::asio::io_service& io, const int& sleepTimeSeconds)
    :timer_(io,boost::posix_time::seconds(sleepTimeSeconds)),sleepSeconds_(sleepTimeSeconds){
        this->timer_.async_wait(boost::bind(&DatabaseUpdateChecker::doDBUpdateCheck,this,boost::asio::placeholders::error));
    };

    protected:
    void doDBUpdateCheck(const boost::system::error_code& error){
        if(!error){
            std::cout << " Checking Database for updates" << std::endl;
            //Reschdule ourself
            this->timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(this->sleepSeconds_));
            this->timer_.async_wait(boost::bind(&DatabaseUpdateChecker::doDBUpdateCheck,this,boost::asio::placeholders::error));
        }
    };
    private:
    boost::asio::deadline_timer timer_;
    int sleepSeconds_;  
};

typedef boost::shared_ptr<boost::asio::ip::tcp::socket> TcpSocketPtr;

class NetworkRequest{
    public: 
    NetworkRequest(boost::asio::io_service& io, const int& port)
    :acceptor_(io,boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(),port)){
        this->start_accept();   
    };  
    protected:
    void start_accept(){
        TcpSocketPtr socketPtr(new boost::asio::ip::tcp::socket(acceptor_.get_io_service()));
        std::cout << "About to accept new connection" << std::endl;
        acceptor_.async_accept(*socketPtr,boost::bind(&NetworkRequest::handle_accept,this,socketPtr,boost::asio::placeholders::error));
    };  
    void handle_accept(TcpSocketPtr socketPtr,const boost::system::error_code& error){
        std::cout << "Accepted new network connection" << std::endl;
        if(!error){
            std::string response("This is a response\n");
            boost::asio::async_write(*socketPtr,boost::asio::buffer(response),
                boost::bind(&NetworkRequest::handle_write,this,boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));
        }
        //Start listeing for a new connection
        this->start_accept();
    }   
    void handle_write(const boost::system::error_code& error,size_t size){
        if(!error){
            std::cout << "Wrote out " << size << " bytes to the network connection" << std::endl;
        }

    }   
    private:
    boost::asio::ip::tcp::acceptor acceptor_;
};

int main(int argc, char *argv[]) {
    static const int DB_TIMER_SECONDS=5;
    static const int LISTENING_TCP_PORT=4444;

    std::cout << "About to start" << std::endl;
    boost::asio::io_service io;

    DatabaseUpdateChecker dbChecker(io,DB_TIMER_SECONDS);
    NetworkRequest networkRequestAcceptor(io,LISTENING_TCP_PORT);

    io.run();

    std::cout << "This won't be printed" << std::endl;  
    return 0;
}

编译上述内容并运行它将显示数据库更新检查器将在侦听 TCP 端口 4444 上的连接时每 5 秒检查一次更新。要查看代码接受新连接,您可以使用 telnet/netcat/您最喜欢的网络客户端工具...

telnet 127.0.0.1 4444
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
This is a response
Connection closed by foreign host.

如果您发现更新和/或请求的处理需要大量时间,那么我会考虑对您的应用程序进行线程化并在其自己的线程中运行每个任务。 io_service 将安排它必须完成的工作,直到没有更多工作时才完成。诀窍是让正在做作业的班级在完成后自行重新安排时间。

当然,您必须考虑其他人对您的问题的评论。我不知道 CORBA 接口如何使这个复杂化,但我认为 boost::asio 作为异步 C++ 库将是一个很好的决定,并且对于您所描述的内容来说足够灵活。

I'd strongly recommend using Boost's ASIO library

You'd need a class to accept new requests and another to periodically check for updates. Both could do their work asynchronously and use the same boost::asio::io_service to schedule work.

The setup would be

  • A network asynchronous boost::asio::ip::tcp::acceptor listening for new requests.
  • A boost::asio::deadline_time do an asynchronous wait do check for updates to the database.

Pseudo code for what I understand you are describing is below:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <string>

class DatabaseUpdateChecker{
    public:
    DatabaseUpdateChecker(boost::asio::io_service& io, const int& sleepTimeSeconds)
    :timer_(io,boost::posix_time::seconds(sleepTimeSeconds)),sleepSeconds_(sleepTimeSeconds){
        this->timer_.async_wait(boost::bind(&DatabaseUpdateChecker::doDBUpdateCheck,this,boost::asio::placeholders::error));
    };

    protected:
    void doDBUpdateCheck(const boost::system::error_code& error){
        if(!error){
            std::cout << " Checking Database for updates" << std::endl;
            //Reschdule ourself
            this->timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(this->sleepSeconds_));
            this->timer_.async_wait(boost::bind(&DatabaseUpdateChecker::doDBUpdateCheck,this,boost::asio::placeholders::error));
        }
    };
    private:
    boost::asio::deadline_timer timer_;
    int sleepSeconds_;  
};

typedef boost::shared_ptr<boost::asio::ip::tcp::socket> TcpSocketPtr;

class NetworkRequest{
    public: 
    NetworkRequest(boost::asio::io_service& io, const int& port)
    :acceptor_(io,boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(),port)){
        this->start_accept();   
    };  
    protected:
    void start_accept(){
        TcpSocketPtr socketPtr(new boost::asio::ip::tcp::socket(acceptor_.get_io_service()));
        std::cout << "About to accept new connection" << std::endl;
        acceptor_.async_accept(*socketPtr,boost::bind(&NetworkRequest::handle_accept,this,socketPtr,boost::asio::placeholders::error));
    };  
    void handle_accept(TcpSocketPtr socketPtr,const boost::system::error_code& error){
        std::cout << "Accepted new network connection" << std::endl;
        if(!error){
            std::string response("This is a response\n");
            boost::asio::async_write(*socketPtr,boost::asio::buffer(response),
                boost::bind(&NetworkRequest::handle_write,this,boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));
        }
        //Start listeing for a new connection
        this->start_accept();
    }   
    void handle_write(const boost::system::error_code& error,size_t size){
        if(!error){
            std::cout << "Wrote out " << size << " bytes to the network connection" << std::endl;
        }

    }   
    private:
    boost::asio::ip::tcp::acceptor acceptor_;
};

int main(int argc, char *argv[]) {
    static const int DB_TIMER_SECONDS=5;
    static const int LISTENING_TCP_PORT=4444;

    std::cout << "About to start" << std::endl;
    boost::asio::io_service io;

    DatabaseUpdateChecker dbChecker(io,DB_TIMER_SECONDS);
    NetworkRequest networkRequestAcceptor(io,LISTENING_TCP_PORT);

    io.run();

    std::cout << "This won't be printed" << std::endl;  
    return 0;
}

Compiling the above and running it will show that the Database Update Checker will check for updates every 5 seconds while listening for connections on TCP port 4444. To see the code accept a new connection you can use telnet/netcat/your favorite network client tool....

telnet 127.0.0.1 4444
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
This is a response
Connection closed by foreign host.

If you find that the processing of updates and/or requests takes a significant amount of time then I'd look into threading your application and running each task in it's own thread. io_service will schedule what work it has to do and not complete until there is no more work. The trick is to have the classes doing work reschedule themselves when they are done.

Of course you have to take into account the comments of others on your question. I dont' know how a CORBA interface might complicate this but I would think boost::asio as an asynchronous C++ library would be a good decision and flexible enough for what you describe.

回忆躺在深渊里 2024-12-28 17:20:43

听起来的意思是,当系统不断处理网络请求时,它会与数据库异步通信。

这意味着当它需要与数据库通信时,它会发送查询但不等待响应。

当它从数据库收到响应时,它会对其进行处理。

异步部分可以通过一个单独的线程与数据库对话来实现,当它得到响应时,它会在服务器队列中发布一个事件以进行处理。

或者服务器可能在许多套接字上侦听数据,其中之一可能是数据库连接,从数据库获取响应。

It sounds like what it means is that while the system processes network requests continuously it asynchronously communicates with the DB.

What it means is that when it needs to talk to DB it sends a query but does not wait for response.

When it gets response from DB it processes it.

The asynchronous part could be implemented by having a separate thread that talks to DB and when it gets response it posts an even in the server queue for processing.

Or the server could be listening on many sockets for data and one of them could be a database connection where it gets responses from DB.

独自唱情﹋歌 2024-12-28 17:20:43

所以基本上(如果我理解正确的话)你需要定期轮询数据库以查看它是否已更改。当它发生变化时,您需要通知基于 CORBA 的请求处理器发生了变化。

我要做的就是向 CORBA 服务器添加一个新的请求类型。该请求将是“数据库已更新。”。然后,您可以编写一个完全不同的小程序,其唯一的工作是轮询数据库并在数据库更新时发送 CORBA 请求。

这样,您可以将数据库更新消息折叠到 CORBA 服务器的主请求流中。

没有线程,没有异步任何东西。只有两个进程,每个进程都做自己的事情。

So basically (if I understand you correctly) you need to poll a database periodically to see if it's changed. And when it has changed, you need to notify a CORBA-based request processor that there has been a change.

What I would do is add a new request type to your CORBA server. The request would be "The database has updated.". Then you can write a small, completely different program who's only job is to poll the database and send the CORBA request when the database has been updated.

That way, you can fold the database update message into the main request stream for the CORBA server.

No threads, no asynchronous anything. Just two processes each doing their own thing.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文