服务器在发送大块数据时被中断而崩溃

发布于 2025-01-18 20:23:30 字数 9305 浏览 0 评论 0 原文

当我优雅地关闭与之连接的客户端时,我的服务器会崩溃,而客户端正在接收大量数据。我想到的是生命周期的错误,就像Boost Asio中最多的错误一样,但是我无法指出自己的错误。

每个客户端都建立了与服务器的2个连接,其中一个是用于同步,另一个连接是长期使用的连接,以接收连续的更新。在“同步阶段”中,客户端接收大量数据以与服务器状态同步(“状态”基本上是DB数据的JSON格式)。同步后,同步连接关闭。客户会通过其他连接接收到DB的更新(当然,这些数据与“同步数据”相比非常小)。

这些是相关文件:

connection.h

#pragma once

#include <array>
#include <memory>
#include <string>
#include <boost/asio.hpp>

class ConnectionManager;

/// Represents a single connection from a client.
class Connection : public std::enable_shared_from_this<Connection>
{
public:
  Connection(const Connection&) = delete;
  Connection& operator=(const Connection&) = delete;

  /// Construct a connection with the given socket.
  explicit Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager);

  /// Start the first asynchronous operation for the connection.
  void start();

  /// Stop all asynchronous operations associated with the connection.
  void stop();

  /// Perform an asynchronous write operation.
  void do_write(const std::string& buffer);

  int getNativeHandle();

  ~Connection();

private:
  /// Perform an asynchronous read operation.
  void do_read();

  /// Socket for the connection.
  boost::asio::ip::tcp::socket socket_;

  /// The manager for this connection.
  ConnectionManager& connection_manager_;

  /// Buffer for incoming data.
  std::array<char, 8192> buffer_;

  std::string outgoing_buffer_;
};

typedef std::shared_ptr<Connection> connection_ptr;

connection.cpp

#include "connection.h"

#include <utility>
#include <vector>
#include <iostream>
#include <thread>

#include "connection_manager.h"

Connection::Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager)
    : socket_(std::move(socket))
    , connection_manager_(manager)
{
}

void Connection::start()
{
  do_read();
}

void Connection::stop()
{
  socket_.close();
}

Connection::~Connection()
{
}

void Connection::do_read()
{
  auto self(shared_from_this());
  socket_.async_read_some(boost::asio::buffer(buffer_), [this, self](boost::system::error_code ec, std::size_t bytes_transferred) {
        if (!ec) {
            std::string buff_str = std::string(buffer_.data(), bytes_transferred);
            const auto& tokenized_buffer = split(buff_str, ' ');
            
            if(!tokenized_buffer.empty() && tokenized_buffer[0] == "sync") {
                /// "syncing connection" sends a specific text
                /// hence I can separate between sycing and long-lived connections here and act accordingly.

                const auto& exec_json_strs = getExecutionJsons();
                const auto& order_json_strs = getOrdersAsJsons();
                const auto& position_json_strs = getPositionsAsJsons();
                const auto& all_json_strs = exec_json_strs + order_json_strs + position_json_strs + createSyncDoneJson();
                
                /// this is potentially a very large data.
                do_write(all_json_strs);
            }

            do_read();
        } else {
          connection_manager_.stop(shared_from_this());
        }
      });
}

void Connection::do_write(const std::string& write_buffer)
{
  outgoing_buffer_ = write_buffer;

  auto self(shared_from_this());
  boost::asio::async_write(socket_, boost::asio::buffer(outgoing_buffer_, outgoing_buffer_.size()), [this, self](boost::system::error_code ec, std::size_t transfer_size) {
        if (!ec) {
           /// everything is fine.
        } else {
           /// what to do here?
           /// server crashes once I get error code 32 (EPIPE) here.
        }
      });
}

connection_hmanager.h

#pragma once

#include <set>
#include "connection.h"

/// Manages open connections so that they may be cleanly stopped when the server
/// needs to shut down.
class ConnectionManager
{
public:
  ConnectionManager(const ConnectionManager&) = delete;
  ConnectionManager& operator=(const ConnectionManager&) = delete;

  /// Construct a connection manager.
  ConnectionManager();

  /// Add the specified connection to the manager and start it.
  void start(connection_ptr c);

  /// Stop the specified connection.
  void stop(connection_ptr c);

  /// Stop all connections.
  void stop_all();

  void sendAllConnections(const std::string& buffer);

private:
  /// The managed connections.
  std::set<connection_ptr> connections_;
};

connection_manager.cpp

#include "connection_manager.h"

ConnectionManager::ConnectionManager()
{
}

void ConnectionManager::start(connection_ptr c)
{
  connections_.insert(c);
  c->start();
}

void ConnectionManager::stop(connection_ptr c)
{
    connections_.erase(c);
    c->stop();
}

void ConnectionManager::stop_all()
{
  for (auto c: connections_)
    c->stop();

  connections_.clear();
}

/// this function is used to keep clients up to date with the changes, not used during syncing phase.
void ConnectionManager::sendAllConnections(const std::string& buffer)
{
  for (auto c: connections_)
      c->do_write(buffer);
}

server.h

#pragma once

#include <boost/asio.hpp>
#include <string>
#include "connection.h"
#include "connection_manager.h"

class Server
{
public:
  Server(const Server&) = delete;
  Server& operator=(const Server&) = delete;

  /// Construct the server to listen on the specified TCP address and port, and
  /// serve up files from the given directory.
  explicit Server(const std::string& address, const std::string& port);

  /// Run the server's io_service loop.
  void run();

  void deliver(const std::string& buffer);

private:
  /// Perform an asynchronous accept operation.
  void do_accept();

  /// Wait for a request to stop the server.
  void do_await_stop();

  /// The io_service used to perform asynchronous operations.
  boost::asio::io_service io_service_;

  /// The signal_set is used to register for process termination notifications.
  boost::asio::signal_set signals_;

  /// Acceptor used to listen for incoming connections.
  boost::asio::ip::tcp::acceptor acceptor_;

  /// The connection manager which owns all live connections.
  ConnectionManager connection_manager_;

  /// The *NEXT* socket to be accepted.
  boost::asio::ip::tcp::socket socket_;
};

server.cpp,

#include "server.h"
#include <signal.h>
#include <utility>

Server::Server(const std::string& address, const std::string& port)
    : io_service_()
    , signals_(io_service_)
    , acceptor_(io_service_)
    , connection_manager_()
    , socket_(io_service_)
{
  // Register to handle the signals that indicate when the server should exit.
  // It is safe to register for the same signal multiple times in a program,
  // provided all registration for the specified signal is made through Asio.
  signals_.add(SIGINT);
  signals_.add(SIGTERM);
#if defined(SIGQUIT)
  signals_.add(SIGQUIT);
#endif // defined(SIGQUIT)

  do_await_stop();

  // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
  boost::asio::ip::tcp::resolver resolver(io_service_);
  boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve({address, port});
  acceptor_.open(endpoint.protocol());
  acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
  acceptor_.bind(endpoint);
  acceptor_.listen();

  do_accept();
}

void Server::run()
{
  // The io_service::run() call will block until all asynchronous operations
  // have finished. While the server is running, there is always at least one
  // asynchronous operation outstanding: the asynchronous accept call waiting
  // for new incoming connections.
  io_service_.run();
}

void Server::do_accept()
{
  acceptor_.async_accept(socket_,
      [this](boost::system::error_code ec)
      {
        // Check whether the server was stopped by a signal before this
        // completion handler had a chance to run.
        if (!acceptor_.is_open())
        {
          return;
        }

        if (!ec)
        {
          connection_manager_.start(std::make_shared<Connection>(
              std::move(socket_), connection_manager_));
        }

        do_accept();
      });
}

void Server::do_await_stop()
{
  signals_.async_wait(
      [this](boost::system::error_code /*ec*/, int /*signo*/)
      {
        // The server is stopped by cancelling all outstanding asynchronous
        // operations. Once all operations have finished the io_service::run()
        // call will exit.
        acceptor_.close();
        connection_manager_.stop_all();
      });
}

/// this function is used to keep clients up to date with the changes, not used during syncing phase.
void Server::deliver(const std::string& buffer)
{
    connection_manager_.sendAllConnections(buffer);
}   

所以我正在重复我的问题:当我优雅地关闭已连接到它的客户端时,我的服务器会崩溃正在收到大量数据,我不知道为什么。

编辑:一旦收到Epipe错误,崩溃就会发生在Async_write函数中。该应用程序是多线程。有4个呼叫服务器的线程::通过产生的数据传递。 trive()用于保持客户端的最新状态,它与初始同步无关:同步是通过从db获取的持续数据完成的。

我有一个io_service,所以我认为我不需要链。 io_service :: Run在主线程上调用,因此主线程正在阻止。

My server crashes when I gracefully close a client that is connected to it, while the client is receiving a large chunk of data. I am thinking of a possible lifetime bug as with the most bugs in boost ASIO, however I was not able to point out my mistake myself.

Each client establishes 2 connection with the server, one of them is for syncing, the other connection is long-lived one to receive continuous updates. In the "syncing phase" client receives large data to sync with the server state ("state" is basically DB data in JSON format). After syncing, sync connection is closed. Client receives updates to the DB as it happens (these are of course very small data compared to "syncing data") via the other connection.

These are the relevant files:

connection.h

#pragma once

#include <array>
#include <memory>
#include <string>
#include <boost/asio.hpp>

class ConnectionManager;

/// Represents a single connection from a client.
class Connection : public std::enable_shared_from_this<Connection>
{
public:
  Connection(const Connection&) = delete;
  Connection& operator=(const Connection&) = delete;

  /// Construct a connection with the given socket.
  explicit Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager);

  /// Start the first asynchronous operation for the connection.
  void start();

  /// Stop all asynchronous operations associated with the connection.
  void stop();

  /// Perform an asynchronous write operation.
  void do_write(const std::string& buffer);

  int getNativeHandle();

  ~Connection();

private:
  /// Perform an asynchronous read operation.
  void do_read();

  /// Socket for the connection.
  boost::asio::ip::tcp::socket socket_;

  /// The manager for this connection.
  ConnectionManager& connection_manager_;

  /// Buffer for incoming data.
  std::array<char, 8192> buffer_;

  std::string outgoing_buffer_;
};

typedef std::shared_ptr<Connection> connection_ptr;

connection.cpp

#include "connection.h"

#include <utility>
#include <vector>
#include <iostream>
#include <thread>

#include "connection_manager.h"

Connection::Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager)
    : socket_(std::move(socket))
    , connection_manager_(manager)
{
}

void Connection::start()
{
  do_read();
}

void Connection::stop()
{
  socket_.close();
}

Connection::~Connection()
{
}

void Connection::do_read()
{
  auto self(shared_from_this());
  socket_.async_read_some(boost::asio::buffer(buffer_), [this, self](boost::system::error_code ec, std::size_t bytes_transferred) {
        if (!ec) {
            std::string buff_str = std::string(buffer_.data(), bytes_transferred);
            const auto& tokenized_buffer = split(buff_str, ' ');
            
            if(!tokenized_buffer.empty() && tokenized_buffer[0] == "sync") {
                /// "syncing connection" sends a specific text
                /// hence I can separate between sycing and long-lived connections here and act accordingly.

                const auto& exec_json_strs = getExecutionJsons();
                const auto& order_json_strs = getOrdersAsJsons();
                const auto& position_json_strs = getPositionsAsJsons();
                const auto& all_json_strs = exec_json_strs + order_json_strs + position_json_strs + createSyncDoneJson();
                
                /// this is potentially a very large data.
                do_write(all_json_strs);
            }

            do_read();
        } else {
          connection_manager_.stop(shared_from_this());
        }
      });
}

void Connection::do_write(const std::string& write_buffer)
{
  outgoing_buffer_ = write_buffer;

  auto self(shared_from_this());
  boost::asio::async_write(socket_, boost::asio::buffer(outgoing_buffer_, outgoing_buffer_.size()), [this, self](boost::system::error_code ec, std::size_t transfer_size) {
        if (!ec) {
           /// everything is fine.
        } else {
           /// what to do here?
           /// server crashes once I get error code 32 (EPIPE) here.
        }
      });
}

connection_manager.h

#pragma once

#include <set>
#include "connection.h"

/// Manages open connections so that they may be cleanly stopped when the server
/// needs to shut down.
class ConnectionManager
{
public:
  ConnectionManager(const ConnectionManager&) = delete;
  ConnectionManager& operator=(const ConnectionManager&) = delete;

  /// Construct a connection manager.
  ConnectionManager();

  /// Add the specified connection to the manager and start it.
  void start(connection_ptr c);

  /// Stop the specified connection.
  void stop(connection_ptr c);

  /// Stop all connections.
  void stop_all();

  void sendAllConnections(const std::string& buffer);

private:
  /// The managed connections.
  std::set<connection_ptr> connections_;
};

connection_manager.cpp

#include "connection_manager.h"

ConnectionManager::ConnectionManager()
{
}

void ConnectionManager::start(connection_ptr c)
{
  connections_.insert(c);
  c->start();
}

void ConnectionManager::stop(connection_ptr c)
{
    connections_.erase(c);
    c->stop();
}

void ConnectionManager::stop_all()
{
  for (auto c: connections_)
    c->stop();

  connections_.clear();
}

/// this function is used to keep clients up to date with the changes, not used during syncing phase.
void ConnectionManager::sendAllConnections(const std::string& buffer)
{
  for (auto c: connections_)
      c->do_write(buffer);
}

server.h

#pragma once

#include <boost/asio.hpp>
#include <string>
#include "connection.h"
#include "connection_manager.h"

class Server
{
public:
  Server(const Server&) = delete;
  Server& operator=(const Server&) = delete;

  /// Construct the server to listen on the specified TCP address and port, and
  /// serve up files from the given directory.
  explicit Server(const std::string& address, const std::string& port);

  /// Run the server's io_service loop.
  void run();

  void deliver(const std::string& buffer);

private:
  /// Perform an asynchronous accept operation.
  void do_accept();

  /// Wait for a request to stop the server.
  void do_await_stop();

  /// The io_service used to perform asynchronous operations.
  boost::asio::io_service io_service_;

  /// The signal_set is used to register for process termination notifications.
  boost::asio::signal_set signals_;

  /// Acceptor used to listen for incoming connections.
  boost::asio::ip::tcp::acceptor acceptor_;

  /// The connection manager which owns all live connections.
  ConnectionManager connection_manager_;

  /// The *NEXT* socket to be accepted.
  boost::asio::ip::tcp::socket socket_;
};

server.cpp

#include "server.h"
#include <signal.h>
#include <utility>

Server::Server(const std::string& address, const std::string& port)
    : io_service_()
    , signals_(io_service_)
    , acceptor_(io_service_)
    , connection_manager_()
    , socket_(io_service_)
{
  // Register to handle the signals that indicate when the server should exit.
  // It is safe to register for the same signal multiple times in a program,
  // provided all registration for the specified signal is made through Asio.
  signals_.add(SIGINT);
  signals_.add(SIGTERM);
#if defined(SIGQUIT)
  signals_.add(SIGQUIT);
#endif // defined(SIGQUIT)

  do_await_stop();

  // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
  boost::asio::ip::tcp::resolver resolver(io_service_);
  boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve({address, port});
  acceptor_.open(endpoint.protocol());
  acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
  acceptor_.bind(endpoint);
  acceptor_.listen();

  do_accept();
}

void Server::run()
{
  // The io_service::run() call will block until all asynchronous operations
  // have finished. While the server is running, there is always at least one
  // asynchronous operation outstanding: the asynchronous accept call waiting
  // for new incoming connections.
  io_service_.run();
}

void Server::do_accept()
{
  acceptor_.async_accept(socket_,
      [this](boost::system::error_code ec)
      {
        // Check whether the server was stopped by a signal before this
        // completion handler had a chance to run.
        if (!acceptor_.is_open())
        {
          return;
        }

        if (!ec)
        {
          connection_manager_.start(std::make_shared<Connection>(
              std::move(socket_), connection_manager_));
        }

        do_accept();
      });
}

void Server::do_await_stop()
{
  signals_.async_wait(
      [this](boost::system::error_code /*ec*/, int /*signo*/)
      {
        // The server is stopped by cancelling all outstanding asynchronous
        // operations. Once all operations have finished the io_service::run()
        // call will exit.
        acceptor_.close();
        connection_manager_.stop_all();
      });
}

/// this function is used to keep clients up to date with the changes, not used during syncing phase.
void Server::deliver(const std::string& buffer)
{
    connection_manager_.sendAllConnections(buffer);
}   

So, I am repeating my question: My server crashes when I gracefully close a client that is connected to it, while the client is receiving a large chunk of data and I do not know why.

Edit: Crash happens in async_write function, as soon as I receive EPIPE error. The application is multithreaded. There are 4 threads that call Server::deliver with their own data as it is produced. deliver() is used for keeping clients up to date, it has nothing to do with the initial syncing: syncing is done with persistent data fetched from db.

I had a single io_service, so I thought that I would not need strands. io_service::run is called on main thread, so the main thread is blocking.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

汹涌人海 2025-01-25 20:23:30

审查,添加一些丢失的代码位:

namespace /*missing code stubs*/ {
    auto split(std::string_view input, char delim) {
        std::vector<std::string_view> result;
        boost::algorithm::split(result, input,
                                boost::algorithm::is_from_range(delim, delim));
        return result;
    }

    std::string getExecutionJsons()   { return ""; }
    std::string getOrdersAsJsons()    { return ""; }
    std::string getPositionsAsJsons() { return ""; }
    std::string createSyncDoneJson()  { return ""; }
}

现在我注意到的内容是:

  1. 您有一个 io_service ,所以一个线程。好的,除非您在其他代码中有线程( main ,例如?)。

  2. 怀疑线程在起作用的一个特殊原因是,没有人可以调用 server :: velrive ,因为 run()正在阻止。这意味着每当您致电 deliver()现在会导致数据竞赛,这会导致未定义的行为

    休闲评论

      ///此功能用于使客户端保持最新的更改,
     ///在同步阶段未使用。
     

    对消除这种担忧并没有太大的作用。该法规需要防御滥用。评论不会被执行。使它变得更好:

      void Server :: deliver(const std :: string&amp; buffer){
         帖子(io_context_,
              [this,buffer] {connection_manager_.broadcast(std :: move(buffer)); });
     }
     
  3. 您在接受“新”元素之前就没有检查以前的写作已完成。这意味着调用连接:: do_write

    修复的典型方法是将传出消息队列的队列。

  4. ​全部或错误。

    而不是动态缓冲区(例如

    • 直接阅读 std :: String ,因此您不必将缓冲区复制到字符串
    • 阅读到 streambuf ,以便您可以使用 std :: istream(&amp; sbuf _)将其解析而不是tokenizing
  5. concateNating all_json_strs 显然 /em>拥有文本容器是浪费的。

    更好的是,请考虑一种流媒体方法来进行JSON序列化,因此并非所有JSON都需要在任何给定时间内序列化。


  6. 不要声明空驱动器(〜连接)。他们是pessimization

  7. 同样是Pessimization

    ConnectionManager )。如果必须,请考虑

      ConnectionManager :: ConnectionManager()=默认值;
     
  8. getNativeHandle 给我更多有关其他代码的问题,可能会干扰。例如,它可能指示其他执行操作的库,这再次可能导致重叠的读取/写入,或者可能是更多居住在线程上的代码的标志(如 server :: run() )


  9. 连接管理器可能应保留 feal_ptr ,因此连接最终可能会终止。现在,最后一个引用是按定义在连接管理器中举行的,这意味着当同伴断开连接或会话失败时,任何其他原因都不会破坏。

  10. 这不是惯用的:

      //在此之前检查服务器是否被信号停止
    //完成处理程序有机会跑步。
    if(!cactoror_.is_open()){
        返回;
    }
     

    如果您关闭了受体,则使用 error :: Operation_aborted 无论如何都会调用完成处理程序。只需处理这一点,例如,在最终版本中,我将稍后发布:

      //每个连接的单独链 - 以防万一您添加线程
    accceptor_.async_accept(
        make_strand(io_context_),[this](error_code ec,tcp :: socket Sock Sock){
            如果(!ec){
                connection_manager_.register_and_start(
                    std :: make_shared&lt;连接&gt;(std :: move(sock),
                                                 Connection_Manager_));
                do_accept();
            }
        });
     
  11. 我注意到此评论:

      //通过取消所有未出色的异步来停止服务器
    // 运营。一旦所有操作都完成了io_service :: run()
    //通话将退出。
     

    实际上,您从不 cancel()您代码中任何IO对象上的任何操作。同样,评论未执行。最好像您所说的那样做,并让毁灭者关闭资源。当使用对象时,这会防止伪造错误,并且还可以防止非常烦人的比赛条件,例如您关闭手柄时,其他一些线程在同一档案中重新打开了一个新的流,并且您将手柄放到了第三个方面party(使用 getNativeHandle )...您看到这是何处?

再现问题?

在以这种方式进行了审查之后,我试图重复此问题,因此我创建了假数据:

    std::string getExecutionJsons()   { return std::string(1024,  'E'); }
    std::string getOrdersAsJsons()    { return std::string(13312, 'O'); }
    std::string getPositionsAsJsons() { return std::string(8192,  'P'); }
    std::string createSyncDoneJson()  { return std::string(24576, 'C'); }

对连接类进行了一些小调整:

    std::string buff_str =
        std::string(buffer_.data(), bytes_transferred);
    const auto& tokenized_buffer = split(buff_str, ' ');

    if (!tokenized_buffer.empty() &&
        tokenized_buffer[0] == "sync") {
        std::cerr << "sync detected on " << socket_.remote_endpoint() << std::endl;
        /// "syncing connection" sends a specific text
        /// hence I can separate between sycing and long-lived
        /// connections here and act accordingly.

        const auto& exec_json_strs     = getExecutionJsons();
        const auto& order_json_strs    = getOrdersAsJsons();
        const auto& position_json_strs = getPositionsAsJsons();
        const auto& all_json_strs      = exec_json_strs +
            order_json_strs + position_json_strs +
            createSyncDoneJson();

        std::cerr << "All json length: " << all_json_strs.length() << std::endl;
        /// this is potentially a very large data.
        do_write(all_json_strs); // already on strand!
    }

我们可以通过NetCat:Good来获得服务器输出

sync detected on 127.0.0.1:43012
All json length: 47104
sync detected on 127.0.0.1:43044
All json length: 47104

和客户端

$ netcat localhost 8989 <<< 'sync me' > expected
^C
nbsp;wc -c expected 
47104 expected

。现在,让我们过早断开连接:

netcat localhost 8989 -w0 <<< 'sync me' > truncated
$ wc -c truncated 
0 truncated

因此,它确实会导致早期关闭,但是服务器仍然说

sync detected on 127.0.0.1:44176
All json length: 47104

Let's Instrument do_write

    async_write( //
        socket_, boost::asio::buffer(outgoing_buffer_, outgoing_buffer_.size()),
        [/*this,*/ self](error_code ec, size_t transfer_size) {
            std::cerr << "do_write completion: " << transfer_size << " bytes ("
                      << ec.message() << ")" << std::endl;

            if (!ec) {
                /// everything is fine.
            } else {
                /// what to do here?
                // FIXME: probably cancel the read loop so the connection
                // closes?
            }
        });

现在我们看到:

sync detected on 127.0.0.1:44494
All json length: 47104
do_write completion: 47104 bytes (Success)
sync detected on 127.0.0.1:44512
All json length: 47104
do_write completion: 32768 bytes (Operation canceled)

一个断开连接和一个“好”连接。

没有崩溃/未定义行为的迹象。让我们检查一下 -fsanitize =地址,未定义的:清洁记录,甚至添加心跳:

int main() {
    Server s("127.0.0.1", "8989");

    std::thread yolo([&s] {
        using namespace std::literals;
        int i = 1;

        do {
            std::this_thread::sleep_for(5s);
        } while (s.deliver("HEARTBEAT DEMO " + std::to_string(i++)));
    });

    s.run();

    yolo.join();
}

结论

上面突出显示的唯一问题是:

  • 其他线程问题未显示(也许可以通过<<代码> getNativeHandle )

  • 您可以在连接 do_write 中重叠的事实。修复:

      void connection :: write(std :: string msg){// public,可能不在链上
         post(socket_.get_executor(),
              [self = shored_from_this(),msg = std :: move(msg)]()utable {
                  self&gt; do_write(std :: move(msg));
              });
     }
    
     void connection :: do_write(std :: string msg){//在链上假设
         out out out__push_back(std :: move(msg));
    
         if(out out out_.size()== 1)
             do_write_loop();
     }
    
     void Connection :: do_write_loop(){
         if(out out out_.size()== 0)
             返回;
    
         auto self(shared_from_this());
         async_write(//
             socket_,boost :: asio :: buffer(out out__.front()),
             [this,self](error_code ec,size_t trasse_size){
                 std :: cerr&lt;&lt; “写完成:”&lt;&lt; Transfer_size&lt;&lt; “字节(”
                           &lt;&lt; ec.message()&lt;&lt; ”)&lt;&lt; std :: endl;
    
                 如果(!ec){
                     out out out_.pop_front();
                     do_write_loop();
                 } 别的 {
                     socket_.cancel();
    
                     //理想情况下,这足以释放连接,但是
                     //由于`connectionManager`不使用feek_ptr`需要
                     //使用一种“脐带回流”强制该问题:
                     Connection_Manager_.stop(self);
                 }
             });
     }
     

如您所见,我还拆分/ do_write 以防止链外调用。与 stop 相同。

完整列表

完整列表,其中包括上面的所有备注/修复:

  • file connection.h

      #pragma一次
    
     #include&lt; boost/asio.hpp&gt;
    
     #include&lt; array&gt;
     #include&lt; deque&gt;
     #include&lt;内存&gt;
     #include&lt; string&gt;
     使用boost :: asio :: ip :: tcp;
    
     类ConnectionManager;
    
     ///表示客户端的单个连接。
     类连接:public std :: enable_shared_from_this&lt;连接&gt; {
       民众:
         连接(const connection&amp;)= delete;
         连接&amp;运算符=(const connection&amp;)= delete;
    
         ///与给定套接字构建连接。
         显式连接(TCP ::套接字插座,ConnectionManager&amp; Manager);
    
         void start();
         void stop();
         void write(std :: string msg);
    
       私人的:
         void do_stop();
         void do_write(std :: string msg);
         void do_write_loop();
    
         ///执行异步读取操作。
         void do_read();
    
         ///连接的套接字。
         tcp ::插座套件_;
    
         ///此连接的经理。
         ConnectionManager&amp; Connection_Manager_;
    
         ///用于传入数据的缓冲区。
         std :: array&lt; char,8192&gt;缓冲_;
    
         std :: Deque&lt; std :: string&gt;传出_;
     };
    
     使用Connection_ptr = std :: shared_ptr&lt; connection&gt;;
     
  • 文件 Connection_Manager.h

      #pragma一次
    
     #include&lt; list&gt;
     #include“ Connection.h”
    
     ///管理打开的连接,以便在服务器时可以干净地停止
     ///需要关闭。
     class ConnectionManager {
       民众:
         ConnectionManager(Const ConnectManager&amp;)= delete;
         ConnectionManager&amp; operator =(const ConnectManager&amp;)= delete;
         ConnectionManager()=默认值; //如果您愿意,可以在H/CPP上分开
    
         void register_and_start(connection_ptr c);
         void stop(connection_ptr c);
         void stop_all();
    
         void广播(const std :: string&amp; buffer);
    
         //清除连接,返回剩余的活动连接
         size_t garbage_collect();
    
       私人的:
         使用hander = std :: feek_ptr&lt; connection_ptr :: element_type&gt ;;
         std :: list&lt; handle&gt;连接_;
     };
     
  • 文件 server.h

      #pragma一次
    
     #include&lt; boost/asio.hpp&gt;
     #include&lt; string&gt;
     #include“ Connection.h”
     #include“ connection_manager.h”
    
     类服务器{
       民众:
         服务器(const服务器&amp;)= delete;
         服务器&amp;操作员=(const server&amp;)= delete;
    
         ///构造服务器以在指定的TCP地址和端口上侦听
         ///并从给定目录中提供文件。
         显式服务器(const std :: string&amp;地址,const std :: string&amp; port);
    
         ///运行服务器的io_service循环。
         void run();
    
         Bool deliver(const std :: string&amp; buffer);
    
       私人的:
         void do_accept();
         void do_await_signal();
    
         boost :: asio :: io_context io_context_;
         boost :: asio :: any_io_executor strand_ {io_context_.get_executor()}};
         boost :: asio :: signal_set信号_ {strand_};
         tcp :: acceptor confactor_ {strand_};
         ConnectionManager Connection_Manager_;
     };
     
  • 文件 connection.cpp

      #include“ connection.h”
    
     #include&lt; boost/algorithm/string.hpp&gt;
     #include&lt; iostream&gt;
     #include&lt; thread&gt;
     #include&lt;实用程序&gt;
     #include&lt; vector&gt;
    
     #include“ connection_manager.h”
     使用boost :: system :: error_code;
    
     连接::连接(TCP ::套接字插座,ConnectionManager&amp; Manager)
         :socket_(std ::移动(套接字))
         ,Connection_Manager_(Manager){}
    
     void connection :: start(){//始终在链上假设(因为连接
                                //刚刚构造)
         do_read();
     }
    
     void connection :: stop(){//公共,可能不在链上
         post(socket_.get_executor(),
              [self = shared_from_this()]()utable {
                  self&gt; do_stop();
              });
     }
    
     void connection :: do_stop(){//在链上假定
         socket_.cancel(); //信任共享的指针到破坏
     }
    
     命名空间 /*缺少代码存根* / {
         自动拆分(std :: string_view Input,char delim){
             std :: vector&lt; std :: string_view&gt;结果;
             boost ::算法:: split(结果,输入,
                                     boost :: algorithm :: is_from_range(Delim,delim));
             返回结果;
         }
    
         std :: String getExecutionjsons(){返回std :: string(1024,'e'); }
         std :: string getordersasjsons(){return std :: string(13312,'o'); }
         std :: string getPositionSasjSons(){return std :: string(8192,'p'); }
         std :: string createSyncdonejson(){return std :: string(24576,'c'); }
     } //名称空间
    
     void Connection :: do_read(){
         auto self(shared_from_this());
         socket_.async_read_some(
             boost :: asio :: buffer(buffer_),
             [this,self](error_code ec,size_t bytes_transferred){
                 如果(!ec){
                     STD :: String Buff_str =
                         std :: string(buffer_.data(),bytes_transferred);
                     const auto&amp; tokenized_buffer = split(buff_str,'');
    
                     if(!tokenized_buffer.empty()&amp;&amp;
                         tokenized_buffer [0] ==“同步”){
                         std :: cerr&lt;&lt; “同步检测到”&lt;&lt; socket_.remote_endpoint()&lt;&lt; std :: endl;
                         ///“同步连接”发送特定文本
                         ///因此,我可以在Memcing和长寿之间分开
                         ///在这里连接并采取相应的行动。
    
                         const auto&amp; exec_json_strs = getExecutionjsons();
                         const auto&amp; order_json_strs = getordersasjsons();
                         const auto&amp; position_json_strs = getPositionSasjsons();
                         const auto&amp; all_json_strs = exec_json_strs +
                             order_json_strs + position_json_strs +
                             createsyncdonejson();
    
                         std :: cerr&lt;&lt; “所有json长度:”&lt;&lt; all_json_str.length()&lt;&lt; std :: endl;
                         ///这可能是一个非常大的数据。
                         do_write(all_json_strs); //已经在链上了!
                     }
    
                     do_read();
                 } 别的 {
                     std :: cerr&lt;&lt; “ do_read终止:”&lt;&lt; ec.message()&lt;&lt; std :: endl;
                     Connection_Manager_.stop(shared_from_this());
                 }
             });
     }
    
     void connection :: write(std :: string msg){//公共,可能不在链上
         post(socket_.get_executor(),
              [self = shored_from_this(),msg = std :: move(msg)]()utable {
                  self&gt; do_write(std :: move(msg));
              });
     }
    
     void connection :: do_write(std :: string msg){//在链上假设
         out out out__push_back(std :: move(msg));
    
         if(out out out_.size()== 1)
             do_write_loop();
     }
    
     void Connection :: do_write_loop(){
         if(out out out_.size()== 0)
             返回;
    
         auto self(shared_from_this());
         async_write(//
             socket_,boost :: asio :: buffer(out out__.front()),
             [this,self](error_code ec,size_t trasse_size){
                 std :: cerr&lt;&lt; “写完成:”&lt;&lt; Transfer_size&lt;&lt; “字节(”
                           &lt;&lt; ec.message()&lt;&lt; ”)&lt;&lt; std :: endl;
    
                 如果(!ec){
                     out out out_.pop_front();
                     do_write_loop();
                 } 别的 {
                     socket_.cancel();
    
                     //理想情况下,这足以释放连接,但是
                     //由于`connectionManager`不使用feek_ptr`需要
                     //使用一种“脐带回流”强迫问题:
                     Connection_Manager_.stop(self);
                 }
             });
     }
     
  • 文件 Connection_manager.cpp

      #include“ connection_manager.h”
    
     void ConnectionManager :: register_and_start(connection_ptr c){
         connections_.emplace_back(c);
         c-&gt; start();
     }
    
     void ConnectionManager :: stop(connection_ptr c){
         c-&gt; stop();
     }
    
     void ConnectionManager :: stop_all(){
         for(auto H:Connections _)
             if(auto c = h.lock())
                 c-&gt; stop();
     }
    
     ///此功能用于使客户保持最新的更改,而不是使用
     ///在同步阶段。
     void ConnectionManager ::广播(const std :: string&amp; buffer){
         for(auto H:Connections _)
             if(auto c = h.lock())
                 c-&gt; write(buffer);
     }
    
     size_t connection manager :: garbage_collect(){
         connections_.remove_if(std :: mem_fn(&amp; handle :: expeired));
         返回Connections_.size();
     }
     
  • 文件 server.cpp

      #include“ server.h”
     #include&lt; signal.h&gt;
     #include&lt;实用程序&gt;
    
     使用boost :: system :: error_code;
    
     服务器:: server(const std :: String&amp;地址,const std :: string&amp; port)
         :io_context_(1)//线程提示:单线螺纹
         ,connection_manager_()
     {
         //注册以处理指示服务器何时应退出的信号。
         //可以安全地在程序中多次注册同一信号,
         //提供了指定信号的所有注册,是通过ASIO进行的。
         SIGNALS_.ADD(SIGINT);
         SIGNALS_.ADD(sigterm);
     #如果定义(sigquit)
         signals_.add(sigquit);
     #endif //定义(sigquit)
    
         do_await_signal();
    
         //打开接受者以重复使用地址的选项(即so_reuseaddr)。
         TCP :: Resolver Resolver(io_context_);
         tcp :: endpoint endpoint = *resolver.resolve({address,port});
         concector_.open(endpoint.protocol());
         concector_.set_option(tcp :: confactor :: reuse_address(true));
         concector_.bind(端点);
         acccipor_.listen();
    
         do_accept();
     }
    
     void Server :: Run(){
         // io_service :: run()呼叫将阻塞直到所有异步操作
         //完成。当服务器运行时,总是至少有一个
         //出现的异步操作出现:异步接受电话等待
         //用于新的传入连接。
         io_context_.run();
     }
    
     void Server :: do_accept(){
         //每个连接的单独链 - 以防万一您添加线程
         accceptor_.async_accept(
             make_strand(io_context_),[this](error_code ec,tcp :: socket Sock Sock){
                 如果(!ec){
                     connection_manager_.register_and_start(
                         std :: make_shared&lt;连接&gt;(std :: move(sock),
                                                      Connection_Manager_));
                     do_accept();
                 }
             });
     }
    
     void Server :: do_await_signal(){
         SIGNALS_.ASYNC_WAIT([THIS](error_code /*ec* /,int /*signo* /){
             // strand _上的处理程序,因为executor在SIGNALS上
    
             //通过取消所有未出色的异步来停止服务器
             // 运营。一旦所有操作都完成了io_service :: run()
             //通话将退出。
             concector_.cancel();
             Connection_Manager_.stop_all();
         });
     }
    
     Bool Server :: velrive(const std :: string&amp; buffer){
         如果(io_context_.stopped()){
             返回false;
         }
         帖子(io_context_,
              [this,buffer] {connection_manager_.broadcast(std :: move(buffer)); });
         返回true;
     }
     
  • 文件 test.cpp

      #include“ server.h”
    
     int main(){
         服务器S(“ 127.0.0.1”,“ 8989”);
    
         std ::线程yolo([&amp; s] {
             使用名称空间std ::文字;
             int i = 1;
    
             做 {
                 std :: this_thread :: sleep_for(5s);
             } while(s.deliver(“ heartbeat demo”+std :: to_string(i ++)));
         });
    
         s.run();
    
         yolo.join();
     }
     

Reviewing, adding some missing code bits:

namespace /*missing code stubs*/ {
    auto split(std::string_view input, char delim) {
        std::vector<std::string_view> result;
        boost::algorithm::split(result, input,
                                boost::algorithm::is_from_range(delim, delim));
        return result;
    }

    std::string getExecutionJsons()   { return ""; }
    std::string getOrdersAsJsons()    { return ""; }
    std::string getPositionsAsJsons() { return ""; }
    std::string createSyncDoneJson()  { return ""; }
}

Now the things I notice are:

  1. you have a single io_service, so a single thread. Okay, so no strands should be required unless you have threads in your other code (main, e.g.?).

  2. A particular reason to suspect that threads are at play is that nobody could possibly call Server::deliver because run() is blocking. This means that whenever you call deliver() now it causes a data race, which leads to Undefined Behaviour

    The casual comment

     /// this function is used to keep clients up to date with the changes,
     /// not used during syncing phase.
    

    does not do much to remove this concern. The code needs to defend against misuse. Comments do not get executed. Make it better:

     void Server::deliver(const std::string& buffer) {
         post(io_context_,
              [this, buffer] { connection_manager_.broadcast(std::move(buffer)); });
     }
    
  3. you do not check that previous writes are completed before accepting a "new" one. This means that calling Connection::do_write results in Undefined Behaviour for two reasons:

    • modifying outgoing_buffer_ during an ongoing async operation that uses that buffer is UB

    • having two overlapped async_write on the same IO object is UB (see docs

    The typical way to fix that is to have a queue of outgoing messages instead.

  4. using async_read_some is rarely what you want, especially since the reads don't accumulate into a dynamic buffer. This means that if your packets get separated at unexpected boundaries, you may not detect commands at all, or incorrectly.

    Instead consider asio::async_read_until with a dynamic buffer (e.g.

    • read directly into std::string so you don't have to copy the buffer into a string
    • read into streambuf so you can use std::istream(&sbuf_) to parse instead of tokenizing
  5. Concatenating all_json_strs which clearly have to be owning text containers is wasteful. Instead, use a const-buffer-sequence to combine them all without copying.

    Better yet, consider a streaming approach to JSON serialization so not all the JSON needs to be serialized in memory at any given time.

  6. Don't declare empty destructors (~Connection). They're pessimizations

  7. Likewise for empty constructors (ConnectionManager). If you must, consider

    ConnectionManager::ConnectionManager() = default;
    
  8. The getNativeHandle gives me more questions about other code that may interfere. E.g. it may indicate other libraries doing operations, which again can lead to overlapped reads/writes, or it could be a sign of more code living on threads (as Server::run() is by definition blocking)

  9. Connection manager should probably hold weak_ptr, so Connections could eventually terminate. Now, the last reference is by defintion held in the connection manager, meaning nothing ever gets destructed when the peer disconnects or the session fails for some other reason.

  10. This is not idiomatic:

    // Check whether the server was stopped by a signal before this
    // completion handler had a chance to run.
    if (!acceptor_.is_open()) {
        return;
    }
    

    If you closed the acceptor, the completion handler is called with error::operation_aborted anyways. Simply handle that, e.g. in the final version I'll post later:

    // separate strand for each connection - just in case you ever add threads
    acceptor_.async_accept(
        make_strand(io_context_), [this](error_code ec, tcp::socket sock) {
            if (!ec) {
                connection_manager_.register_and_start(
                    std::make_shared<Connection>(std::move(sock),
                                                 connection_manager_));
                do_accept();
            }
        });
    
  11. I notice this comment:

    // The server is stopped by cancelling all outstanding asynchronous
    // operations. Once all operations have finished the io_service::run()
    // call will exit.
    

    In fact you never cancel() any operation on any IO object in your code. Again, comments aren't executed. It's better to indeed do as you say, and let the destructors close the resources. This prevents spurious errors when objects are used-after-close, and also prevents very annoying race conditions when e.g. you closed the handle, some other thread re-opened a new stream on the same filedescriptor and you had given out the handle to a third party (using getNativeHandle)... you see where this leads?

Reproducing The Problem?

Having reviewed this way, I tried to repro the issue, so I created fake data:

    std::string getExecutionJsons()   { return std::string(1024,  'E'); }
    std::string getOrdersAsJsons()    { return std::string(13312, 'O'); }
    std::string getPositionsAsJsons() { return std::string(8192,  'P'); }
    std::string createSyncDoneJson()  { return std::string(24576, 'C'); }

With some minor tweaks to the Connection class:

    std::string buff_str =
        std::string(buffer_.data(), bytes_transferred);
    const auto& tokenized_buffer = split(buff_str, ' ');

    if (!tokenized_buffer.empty() &&
        tokenized_buffer[0] == "sync") {
        std::cerr << "sync detected on " << socket_.remote_endpoint() << std::endl;
        /// "syncing connection" sends a specific text
        /// hence I can separate between sycing and long-lived
        /// connections here and act accordingly.

        const auto& exec_json_strs     = getExecutionJsons();
        const auto& order_json_strs    = getOrdersAsJsons();
        const auto& position_json_strs = getPositionsAsJsons();
        const auto& all_json_strs      = exec_json_strs +
            order_json_strs + position_json_strs +
            createSyncDoneJson();

        std::cerr << "All json length: " << all_json_strs.length() << std::endl;
        /// this is potentially a very large data.
        do_write(all_json_strs); // already on strand!
    }

We get the server outputting

sync detected on 127.0.0.1:43012
All json length: 47104
sync detected on 127.0.0.1:43044
All json length: 47104

And clients faked with netcat:

$ netcat localhost 8989 <<< 'sync me' > expected
^C
$ wc -c expected 
47104 expected

Good. Now let's cause premature disconnect:

netcat localhost 8989 -w0 <<< 'sync me' > truncated
$ wc -c truncated 
0 truncated

So, it does lead to early close, but server still says

sync detected on 127.0.0.1:44176
All json length: 47104

Let's instrument do_write as well:

    async_write( //
        socket_, boost::asio::buffer(outgoing_buffer_, outgoing_buffer_.size()),
        [/*this,*/ self](error_code ec, size_t transfer_size) {
            std::cerr << "do_write completion: " << transfer_size << " bytes ("
                      << ec.message() << ")" << std::endl;

            if (!ec) {
                /// everything is fine.
            } else {
                /// what to do here?
                // FIXME: probably cancel the read loop so the connection
                // closes?
            }
        });

Now we see:

sync detected on 127.0.0.1:44494
All json length: 47104
do_write completion: 47104 bytes (Success)
sync detected on 127.0.0.1:44512
All json length: 47104
do_write completion: 32768 bytes (Operation canceled)

For one disconnected and one "okay" connection.

No sign of crashes/undefined behaviour. Let's check with -fsanitize=address,undefined: clean record, even adding a heartbeat:

int main() {
    Server s("127.0.0.1", "8989");

    std::thread yolo([&s] {
        using namespace std::literals;
        int i = 1;

        do {
            std::this_thread::sleep_for(5s);
        } while (s.deliver("HEARTBEAT DEMO " + std::to_string(i++)));
    });

    s.run();

    yolo.join();
}

Conclusion

The only problem highlighted above that weren't addressed were:

  • additional threading issues not shown (perhaps via getNativeHandle)

  • the fact that you can have overlapping writes in the Connection do_write. Fixing that:

     void Connection::write(std::string msg) { // public, might not be on the strand
         post(socket_.get_executor(),
              [self = shared_from_this(), msg = std::move(msg)]() mutable {
                  self->do_write(std::move(msg));
              });
     }
    
     void Connection::do_write(std::string msg) { // assumed on the strand
         outgoing_.push_back(std::move(msg));
    
         if (outgoing_.size() == 1)
             do_write_loop();
     }
    
     void Connection::do_write_loop() {
         if (outgoing_.size() == 0)
             return;
    
         auto self(shared_from_this());
         async_write( //
             socket_, boost::asio::buffer(outgoing_.front()),
             [this, self](error_code ec, size_t transfer_size) {
                 std::cerr << "write completion: " << transfer_size << " bytes ("
                           << ec.message() << ")" << std::endl;
    
                 if (!ec) {
                     outgoing_.pop_front();
                     do_write_loop();
                 } else {
                     socket_.cancel();
    
                     // This would ideally be enough to free the connection, but
                     // since `ConnectionManager` doesn't use `weak_ptr` you need to
                     // force the issue using kind of an "umbillical cord reflux":
                     connection_manager_.stop(self);
                 }
             });
     }
    

As you can see I also split write/do_write to prevent off-strand invocation. Same with stop.

Full Listing

A full listing with all the remarks/fixes from above:

  • File connection.h

     #pragma once
    
     #include <boost/asio.hpp>
    
     #include <array>
     #include <deque>
     #include <memory>
     #include <string>
     using boost::asio::ip::tcp;
    
     class ConnectionManager;
    
     /// Represents a single connection from a client.
     class Connection : public std::enable_shared_from_this<Connection> {
       public:
         Connection(const Connection&) = delete;
         Connection& operator=(const Connection&) = delete;
    
         /// Construct a connection with the given socket.
         explicit Connection(tcp::socket socket, ConnectionManager& manager);
    
         void start();
         void stop();
         void write(std::string msg);
    
       private:
         void do_stop();
         void do_write(std::string msg);
         void do_write_loop();
    
         /// Perform an asynchronous read operation.
         void do_read();
    
         /// Socket for the connection.
         tcp::socket socket_;
    
         /// The manager for this connection.
         ConnectionManager& connection_manager_;
    
         /// Buffer for incoming data.
         std::array<char, 8192> buffer_;
    
         std::deque<std::string> outgoing_;
     };
    
     using connection_ptr = std::shared_ptr<Connection>;
    
  • File connection_manager.h

     #pragma once
    
     #include <list>
     #include "connection.h"
    
     /// Manages open connections so that they may be cleanly stopped when the server
     /// needs to shut down.
     class ConnectionManager {
       public:
         ConnectionManager(const ConnectionManager&) = delete;
         ConnectionManager& operator=(const ConnectionManager&) = delete;
         ConnectionManager() = default; // could be split across h/cpp if you wanted
    
         void register_and_start(connection_ptr c);
         void stop(connection_ptr c);
         void stop_all();
    
         void broadcast(const std::string& buffer);
    
         // purge defunct connections, returns remaining active connections
         size_t garbage_collect();
    
       private:
         using handle = std::weak_ptr<connection_ptr::element_type>;
         std::list<handle> connections_;
     };
    
  • File server.h

     #pragma once
    
     #include <boost/asio.hpp>
     #include <string>
     #include "connection.h"
     #include "connection_manager.h"
    
     class Server {
       public:
         Server(const Server&) = delete;
         Server& operator=(const Server&) = delete;
    
         /// Construct the server to listen on the specified TCP address and port,
         /// and serve up files from the given directory.
         explicit Server(const std::string& address, const std::string& port);
    
         /// Run the server's io_service loop.
         void run();
    
         bool deliver(const std::string& buffer);
    
       private:
         void do_accept();
         void do_await_signal();
    
         boost::asio::io_context      io_context_;
         boost::asio::any_io_executor strand_{io_context_.get_executor()};
         boost::asio::signal_set      signals_{strand_};
         tcp::acceptor                acceptor_{strand_};
         ConnectionManager            connection_manager_;
     };
    
  • File connection.cpp

     #include "connection.h"
    
     #include <boost/algorithm/string.hpp>
     #include <iostream>
     #include <thread>
     #include <utility>
     #include <vector>
    
     #include "connection_manager.h"
     using boost::system::error_code;
    
     Connection::Connection(tcp::socket socket, ConnectionManager& manager)
         : socket_(std::move(socket))
         , connection_manager_(manager) {}
    
     void Connection::start() { // always assumed on the strand (since connection
                                // just constructed)
         do_read();
     }
    
     void Connection::stop() { // public, might not be on the strand
         post(socket_.get_executor(),
              [self = shared_from_this()]() mutable {
                  self->do_stop();
              });
     }
    
     void Connection::do_stop() { // assumed on the strand
         socket_.cancel(); // trust shared pointer to destruct
     }
    
     namespace /*missing code stubs*/ {
         auto split(std::string_view input, char delim) {
             std::vector<std::string_view> result;
             boost::algorithm::split(result, input,
                                     boost::algorithm::is_from_range(delim, delim));
             return result;
         }
    
         std::string getExecutionJsons()   { return std::string(1024,  'E'); }
         std::string getOrdersAsJsons()    { return std::string(13312, 'O'); }
         std::string getPositionsAsJsons() { return std::string(8192,  'P'); }
         std::string createSyncDoneJson()  { return std::string(24576, 'C'); }
     } // namespace
    
     void Connection::do_read() {
         auto self(shared_from_this());
         socket_.async_read_some(
             boost::asio::buffer(buffer_),
             [this, self](error_code ec, size_t bytes_transferred) {
                 if (!ec) {
                     std::string buff_str =
                         std::string(buffer_.data(), bytes_transferred);
                     const auto& tokenized_buffer = split(buff_str, ' ');
    
                     if (!tokenized_buffer.empty() &&
                         tokenized_buffer[0] == "sync") {
                         std::cerr << "sync detected on " << socket_.remote_endpoint() << std::endl;
                         /// "syncing connection" sends a specific text
                         /// hence I can separate between sycing and long-lived
                         /// connections here and act accordingly.
    
                         const auto& exec_json_strs     = getExecutionJsons();
                         const auto& order_json_strs    = getOrdersAsJsons();
                         const auto& position_json_strs = getPositionsAsJsons();
                         const auto& all_json_strs      = exec_json_strs +
                             order_json_strs + position_json_strs +
                             createSyncDoneJson();
    
                         std::cerr << "All json length: " << all_json_strs.length() << std::endl;
                         /// this is potentially a very large data.
                         do_write(all_json_strs); // already on strand!
                     }
    
                     do_read();
                 } else {
                     std::cerr << "do_read terminating: " << ec.message() << std::endl;
                     connection_manager_.stop(shared_from_this());
                 }
             });
     }
    
     void Connection::write(std::string msg) { // public, might not be on the strand
         post(socket_.get_executor(),
              [self = shared_from_this(), msg = std::move(msg)]() mutable {
                  self->do_write(std::move(msg));
              });
     }
    
     void Connection::do_write(std::string msg) { // assumed on the strand
         outgoing_.push_back(std::move(msg));
    
         if (outgoing_.size() == 1)
             do_write_loop();
     }
    
     void Connection::do_write_loop() {
         if (outgoing_.size() == 0)
             return;
    
         auto self(shared_from_this());
         async_write( //
             socket_, boost::asio::buffer(outgoing_.front()),
             [this, self](error_code ec, size_t transfer_size) {
                 std::cerr << "write completion: " << transfer_size << " bytes ("
                           << ec.message() << ")" << std::endl;
    
                 if (!ec) {
                     outgoing_.pop_front();
                     do_write_loop();
                 } else {
                     socket_.cancel();
    
                     // This would ideally be enough to free the connection, but
                     // since `ConnectionManager` doesn't use `weak_ptr` you need to
                     // force the issue using kind of an "umbellical cord reflux":
                     connection_manager_.stop(self);
                 }
             });
     }
    
  • File connection_manager.cpp

     #include "connection_manager.h"
    
     void ConnectionManager::register_and_start(connection_ptr c) {
         connections_.emplace_back(c);
         c->start();
     }
    
     void ConnectionManager::stop(connection_ptr c) {
         c->stop();
     }
    
     void ConnectionManager::stop_all() {
         for (auto h : connections_)
             if (auto c = h.lock())
                 c->stop();
     }
    
     /// this function is used to keep clients up to date with the changes, not used
     /// during syncing phase.
     void ConnectionManager::broadcast(const std::string& buffer) {
         for (auto h : connections_)
             if (auto c = h.lock())
                 c->write(buffer);
     }
    
     size_t ConnectionManager::garbage_collect() {
         connections_.remove_if(std::mem_fn(&handle::expired));
         return connections_.size();
     }
    
  • File server.cpp

     #include "server.h"
     #include <signal.h>
     #include <utility>
    
     using boost::system::error_code;
    
     Server::Server(const std::string& address, const std::string& port)
         : io_context_(1) // THREAD HINT: single threaded
         , connection_manager_()
     {
         // Register to handle the signals that indicate when the server should exit.
         // It is safe to register for the same signal multiple times in a program,
         // provided all registration for the specified signal is made through Asio.
         signals_.add(SIGINT);
         signals_.add(SIGTERM);
     #if defined(SIGQUIT)
         signals_.add(SIGQUIT);
     #endif // defined(SIGQUIT)
    
         do_await_signal();
    
         // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
         tcp::resolver resolver(io_context_);
         tcp::endpoint endpoint = *resolver.resolve({address, port});
         acceptor_.open(endpoint.protocol());
         acceptor_.set_option(tcp::acceptor::reuse_address(true));
         acceptor_.bind(endpoint);
         acceptor_.listen();
    
         do_accept();
     }
    
     void Server::run() {
         // The io_service::run() call will block until all asynchronous operations
         // have finished. While the server is running, there is always at least one
         // asynchronous operation outstanding: the asynchronous accept call waiting
         // for new incoming connections.
         io_context_.run();
     }
    
     void Server::do_accept() {
         // separate strand for each connection - just in case you ever add threads
         acceptor_.async_accept(
             make_strand(io_context_), [this](error_code ec, tcp::socket sock) {
                 if (!ec) {
                     connection_manager_.register_and_start(
                         std::make_shared<Connection>(std::move(sock),
                                                      connection_manager_));
                     do_accept();
                 }
             });
     }
    
     void Server::do_await_signal() {
         signals_.async_wait([this](error_code /*ec*/, int /*signo*/) {
             // handler on the strand_ because of the executor on signals_
    
             // The server is stopped by cancelling all outstanding asynchronous
             // operations. Once all operations have finished the io_service::run()
             // call will exit.
             acceptor_.cancel();
             connection_manager_.stop_all();
         });
     }
    
     bool Server::deliver(const std::string& buffer) {
         if (io_context_.stopped()) {
             return false;
         }
         post(io_context_,
              [this, buffer] { connection_manager_.broadcast(std::move(buffer)); });
         return true;
     }
    
  • File test.cpp

     #include "server.h"
    
     int main() {
         Server s("127.0.0.1", "8989");
    
         std::thread yolo([&s] {
             using namespace std::literals;
             int i = 1;
    
             do {
                 std::this_thread::sleep_for(5s);
             } while (s.deliver("HEARTBEAT DEMO " + std::to_string(i++)));
         });
    
         s.run();
    
         yolo.join();
     }
    
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文