网络客户端模拟器设计

发布于 2024-10-13 15:35:34 字数 395 浏览 3 评论 0原文

我正在尝试用 C++ 设计一个软件,它将使用 **UDP 协议** 发送请求字节(遵循标准 **应用程序级** 协议,其要填充的字段将从文本文件中获取)。

现在这个客户端必须能够以非常高的速率发送这些请求..高达**每秒 2000 个事务**,并且如果在指定的超时内得到响应,也应该接收响应,否则不会接收到它

我将使用 boost 库来处理所有套接字,但我不确定它针对如此高速应用程序的设计:(

我想我必须使用高度多线程的应用程序(再次使用Boost)。我说得对吗?我是否必须为每个请求创建一个单独的线程?但我认为只有一个线程必须等待接收响应,否则如果有许多线程正在等待响应,我们如何区分我们已收到响应的线程请求!

希望这个问题是清楚的。我只是需要一些关于设计点和我可能遇到的疑似问题的帮助。

I am trying to design a software in c++ that will send request bytes (following a standard **application level** protocol whose fields to be populated will be taken from a text file) using **UDP protocol**.

Now this client must be able to send these requests at very high rate..upto **2000 transactions per second** and should also receive the response if it gets within a specified timeout else don't receive it

I will be using boost library for all the socket things but I am not sure about the design of it for such high speed application :(

I think I have to use a highly multi-threaded application (again Boost will be used). Am I right ? Do I have to create a seperate thread for each request ? But I think only one thread must be waiting to recieve the response else if many threads are waiting for a response how can we distinguish for which threads request we have got the response for !!

Hope that question is clear. I just need some help regarding the design points and suspected problems that I may face.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

纸短情长 2024-10-20 15:35:34

我现在正在开发自己的网络客户端,所以也许我可以提供一些建议和一些资源供参考。在这个领域有很多更有经验的人,希望他们能参与进来:)

首先,你的目的是提升。一旦您习惯了它们的组合方式,boost ::asio 是一个用于编写网络代码的出色工具包。本质上,您创建一个 io_service 并调用 run 执行直到所有工作完成,或者调用 runOne 执行单个 IO 操作。就其本身而言,这并没有多大帮助。其威力来自于您在自己的循环中运行 runOne

boost::asio::io_service myIOService;
while(true)
{
    myIOService.runOne();
}

或在一个(或多个)线程上运行 run 函数:

boost::thread t(boost::bind(&boost::asio::io_service::run, &myIOService));

但是,值得注意的是 <一旦没有工作要做,code>run就会返回(这样你就可以告别该线程)。正如我在 Stackoverflow 上发现的,诀窍是确保它始终有事可做。解决方案在 boost::asio::io_service::work 中:

boost::asio::io_service::work myWork(myIOService);   // just some abstract "work"

上面的行确保您的线程在没有任何情况发生时不会停止。我认为这是一种保持活动的方法:)

在某些时候,您会想要创建一个套接字并将其连接到某个地方。我创建了一个通用的 Socket 类(并从中派生了一个文本套接字以创建缓冲输入)。我还想要一个基于事件的系统,其工作方式与 C# 非常相似。我在下面为您概述了这些内容:

第一步,我们需要一种传递参数的通用方法,因此,EventArgs

eventArgs.h

 class EventArgs : boost::noncopyable
 {
 private:

 public:
  EventArgs();
  virtual ~EventArgs() = 0;
 }; // eo class EventArgs:

现在,我们需要一个人们可以订阅/取消订阅的事件类:

event.h

// STL
#include <functional>
#include <stack>

// Boost
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>

 // class Event
 class Event : boost::noncopyable
 {
 public:
  typedef std::function<void(const EventArgs&)> DelegateType;
  typedef boost::shared_ptr<DelegateType> DelegateDecl;

 private:
  boost::mutex m_Mutex;
  typedef std::set<DelegateDecl> DelegateSet;
  typedef std::stack<DelegateDecl> DelegateStack;
  typedef DelegateSet::const_iterator DelegateSet_cit;
  DelegateSet m_Delegates;
  DelegateStack m_ToRemove;

 public:
  Event()
  {
  }; // eo ctor


  Event(Event&& _rhs) : m_Delegates(std::move(_rhs.m_Delegates))
  {
  }; // eo mtor

  ~Event()
  {
  }; // eo dtor

  // static methods
  static DelegateDecl bindDelegate(DelegateType _f)
  {
   DelegateDecl ret(new DelegateType(_f));
   return ret;
  }; // eo bindDelegate

  // methods
  void raise(const EventArgs& _args)
  {
   boost::mutex::scoped_lock lock(m_Mutex);

   // get rid of any we have to remove
   while(m_ToRemove.size())
   {
    m_Delegates.erase(m_Delegates.find(m_ToRemove.top()));
    m_ToRemove.pop();
   };

   if(m_Delegates.size())
   std::for_each(m_Delegates.begin(),
        m_Delegates.end(),
        [&_args](const DelegateDecl& _decl) { (*_decl)(_args); });
  }; // eo raise

  DelegateDecl addListener(DelegateDecl _decl)
  {
   boost::mutex::scoped_lock lock(m_Mutex);
   m_Delegates.insert(_decl);
   return _decl;
  }; // eo addListener

  DelegateDecl addListener(DelegateType _f)
  {
   DelegateDecl ret(bindDelegate(_f));
   return addListener(ret);
  }; // eo addListener


  void removeListener(const DelegateDecl _decl)
  {
   boost::mutex::scoped_lock lock(m_Mutex);
   DelegateSet_cit cit(m_Delegates.find(_decl));
   if(cit != m_Delegates.end())
    m_ToRemove.push(_decl);
  }; // eo removeListener

  // operators

  // Only use operator += if you don't which to manually detach using removeListener
  Event& operator += (DelegateType _f)
  {
   addListener(_f);
   return *this;
  }; // eo op +=

 }; // eo class Event

然后,是时候创建一个套接字类了。下面是头文件:

socket.h

(一些注释:ByteVectortypedef std::vector

#pragma once

#include "event.h"

// boost
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/buffer.hpp>
  // class Socket
  class MORSE_API Socket : boost::noncopyable
  {
  protected:
   typedef boost::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;

  private:
   ByteVector      m_Buffer;   // will be used to read in

   SocketPtr        m_SocketPtr;
   boost::asio::ip::tcp::endpoint      m_RemoteEndPoint;
   bool         m_bConnected;

   // reader
   void _handleConnect(const boost::system::error_code& _errorCode, boost::asio::ip::tcp::resolver_iterator _rit);
   void _handleRead(const boost::system::error_code& _errorCode, std::size_t read);
  protected:

   SocketPtr socket() { return m_SocketPtr; };
  public:
   Socket(ByteVector_sz _bufSize = 512);
   virtual ~Socket();

   // properties
   bool isConnected() const { return m_bConnected; };
   const boost::asio::ip::tcp::endpoint& remoteEndPoint() const {return m_RemoteEndPoint; };

   // methods
   void connect(boost::asio::ip::tcp::resolver_iterator _rit);
   void connect(const String& _host, const Port _port);
   void close();

   // Events
   Event onRead;
   Event onResolve;
   Event onConnect;
   Event onClose;
  }; // eo class Socket

现在是实现。您会注意到它调用另一个类来执行 DNS 解析。我稍后会展示这一点。另外,我还省略了一些 EventArg 派生项。当套接字事件发生时,它们只是作为 EventArg 参数传递。

socket.cpp

#include "socket.h"


// boost
#include <boost/asio/placeholders.hpp>

namespace morse
{
 namespace net
 {
  // ctor
  Socket::Socket(ByteVector_sz _bufSize /* = 512 */) : m_bConnected(false)
  {
   m_Buffer.resize(_bufSize);
  }; // eo ctor

  // dtor
  Socket::~Socket()
  {
  }; // eo dtor


  // _handleRead
  void Socket::_handleRead(const boost::system::error_code& _errorCode,
            std::size_t _read)
  {
   if(!_errorCode)
   {
    if(_read)
    {
     onRead.raise(SocketReadEventArgs(*this, m_Buffer, _read));
     // read again
     m_SocketPtr->async_read_some(boost::asio::buffer(m_Buffer), boost::bind(&Socket::_handleRead, this, _1, _2));
    };
   }
   else
    close();
  }; // eo _handleRead


  // _handleConnect
  void Socket::_handleConnect(const boost::system::error_code& _errorCode,
         boost::asio::ip::tcp::resolver_iterator _rit)
  {
   m_bConnected = !_errorCode;
   bool _raise(false);
   if(!_errorCode)
   {
    m_RemoteEndPoint = *_rit;
    _raise = true;
    m_SocketPtr->async_read_some(boost::asio::buffer(m_Buffer), boost::bind(&Socket::_handleRead, this, _1, _2));
   }
   else if(++_rit != boost::asio::ip::tcp::resolver::iterator())
   {
    m_SocketPtr->close();
    m_SocketPtr->async_connect(*_rit, boost::bind(&Socket::_handleConnect, this, boost::asio::placeholders::error, _rit));
   }
   else
    _raise = true; // raise complete failure

   if(_raise)
    onConnect.raise(SocketConnectEventArgs(*this, _errorCode));

  }; // eo _handleConnect


  // connect
  void Socket::connect(boost::asio::ip::tcp::resolver_iterator _rit)
  {
   boost::asio::ip::tcp::endpoint ep(*_rit);
   m_SocketPtr.reset(new boost::asio::ip::tcp::socket(Root::instance().ioService()));
   m_SocketPtr->async_connect(ep, boost::bind(&Socket::_handleConnect, this, boost::asio::placeholders::error, _rit));
  };


  void Socket::connect(const String& _host, Port _port)
  {
   // Anon function for resolution of the host-name and asynchronous calling of the above
   auto anonResolve = [this](const boost::system::error_code& _errorCode, 
           boost::asio::ip::tcp::resolver_iterator _epIt)
   {
    // raise event
    onResolve.raise(SocketResolveEventArgs(*this, !_errorCode ? (*_epIt).host_name() : String(""), _errorCode));

    // perform connect, calling back to anonymous function
    if(!_errorCode)
     this->connect(_epIt);
   };

   // Resolve the host calling back to anonymous function
   Root::instance().resolveHost(_host, _port, anonResolve);

  }; // eo connect


  void Socket::close()
  {
   if(m_bConnected)
   {
    onClose.raise(SocketCloseEventArgs(*this));
    m_SocketPtr->close();
    m_bConnected = false;
   };
  } // eo close

正如我所说的 DNS 解析,Root::instance().resolveHost(_host, _port, anonResolve); 行调用它来执行异步 DNS

  // resolve a host asynchronously
  template<typename ResolveHandler>
  void resolveHost(const String& _host, Port _port, ResolveHandler _handler)
  {
   boost::asio::ip::tcp::endpoint ret;
   boost::asio::ip::tcp::resolver::query query(_host, boost::lexical_cast<std::string>(_port));
   m_Resolver.async_resolve(query, _handler);
  }; // eo resolveHost

: ,我需要一个基于文本的套接字,每次收到我的行(然后进行处理)时都会引发一个事件。这次我将省略头文件,只显示实现文件。不用说,它声明了一个名为 onLineEvent ,每次完整接收到一行时都会触发该事件:

// boost
#include <boost/asio/buffer.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/placeholders.hpp>

namespace morse
{
 namespace net
 {
  String TextSocket::m_DefaultEOL("\r\n");

  // ctor
  TextSocket::TextSocket() : m_EOL(m_DefaultEOL)
  {
   onRead += boost::bind(&TextSocket::readHandler, this, _1);
  }; // eo ctor


  // dtor
  TextSocket::~TextSocket()
  {
  }; // eo dtor


  // readHandler
  void TextSocket::readHandler(const EventArgs& _args)
  {
   auto& args(static_cast<const SocketReadEventArgs&>(_args));
   m_LineBuffer.append(args.buffer().begin(), args.buffer().begin() + args.bytesRead());
   String::size_type pos;
   while((pos = m_LineBuffer.find(eol())) != String::npos)
   {
    onLine.raise(SocketLineEventArgs(*this, m_LineBuffer.substr(0, pos)));
    m_LineBuffer = m_LineBuffer.substr(pos + eol().length());
   };
  }; // eo readHandler


  // writeHandler
  void TextSocket::writeHandler(const boost::system::error_code& _errorCode, std::size_t _written)
  {
   if(!_errorCode)
   {
    m_Queue.pop_front();
    if(!m_Queue.empty()) // more to do?
     boost::asio::async_write(*socket().get(), boost::asio::buffer(m_Queue.front(), m_Queue.front().length()), boost::bind(&TextSocket::writeHandler, this, _1, _2));
   }
   else
    close();
  }; // eo writeHandler

  void TextSocket::sendLine(String _line)
  {
   Root::instance().ioService().post(boost::bind(&TextSocket::_sendLine, this, _line));
  }; // eo sendLine


  // _sendLine
  void TextSocket::_sendLine(String _line)
  {
   // copy'n'queue
   _line.append(m_EOL);
   m_Queue.push_back(_line);
   if(m_Queue.size() == 1) // previously an empty queue, must start write!
    boost::asio::async_write(*socket().get(), boost::asio::buffer(m_Queue.front(), m_Queue.front().length()), boost::bind(&TextSocket::writeHandler, this, _1, _2));
  }; // eo sendLine

关于上面的类需要注意的一些事情......它使用 < code>boost::asio::post 发送行。这允许它全部发生在 ASIO 以线程安全的方式管理的线程上,并允许我们对要发送的行进行排队。这使得它非常具有可扩展性。

我确信还有更多问题,也许我的代码没有帮助。我花了几天时间将它们拼凑起来并理解它,但我怀疑它实际上有什么用处。希望一些更好的人会浏览它并说“天哪,这

I'm halfway through a network client of my own at the moment, so perhaps I can impart some advice and some resources to look at. There are many more experienced in this area, and hopefully they'll chime in :)

Firstly, you're about boost. Once you get used to how it all hangs together, boost::asio is a great toolkit for writing network-code. Essentially, you create an io_service and call run to execute until all work is complete, or runOne to perform a single IO action. On their own, this isn't that helpful. The power comes from when you either run runOne in it's own loop:

boost::asio::io_service myIOService;
while(true)
{
    myIOService.runOne();
}

, or run the run function on one (or more) threads:

boost::thread t(boost::bind(&boost::asio::io_service::run, &myIOService));

However, it is worth noting that run returns as soon as there is no work to do (so you can say goodbye to that thread). As I found out here on Stackoverflow, the trick is to ensure it always has something to do. The solution is in boost::asio::io_service::work:

boost::asio::io_service::work myWork(myIOService);   // just some abstract "work"

The above line ensures your thread won't stop when nothing is going on. I view is as a means to keep-alive :)

At some point, you're going to want to create a socket and connect it somewhere. I created a generic Socket class (and derived a text-socket from that to create buffered input). I also wanted an event-based system that worked very much like C#. I've outlined this stuff for you, below:

First step, we need a generic way of passing arguments around, hence, EventArgs:

eventArgs.h

 class EventArgs : boost::noncopyable
 {
 private:

 public:
  EventArgs();
  virtual ~EventArgs() = 0;
 }; // eo class EventArgs:

Now, we need an event class which people can subscribe/unsubscribe to:

event.h

// STL
#include <functional>
#include <stack>

// Boost
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>

 // class Event
 class Event : boost::noncopyable
 {
 public:
  typedef std::function<void(const EventArgs&)> DelegateType;
  typedef boost::shared_ptr<DelegateType> DelegateDecl;

 private:
  boost::mutex m_Mutex;
  typedef std::set<DelegateDecl> DelegateSet;
  typedef std::stack<DelegateDecl> DelegateStack;
  typedef DelegateSet::const_iterator DelegateSet_cit;
  DelegateSet m_Delegates;
  DelegateStack m_ToRemove;

 public:
  Event()
  {
  }; // eo ctor


  Event(Event&& _rhs) : m_Delegates(std::move(_rhs.m_Delegates))
  {
  }; // eo mtor

  ~Event()
  {
  }; // eo dtor

  // static methods
  static DelegateDecl bindDelegate(DelegateType _f)
  {
   DelegateDecl ret(new DelegateType(_f));
   return ret;
  }; // eo bindDelegate

  // methods
  void raise(const EventArgs& _args)
  {
   boost::mutex::scoped_lock lock(m_Mutex);

   // get rid of any we have to remove
   while(m_ToRemove.size())
   {
    m_Delegates.erase(m_Delegates.find(m_ToRemove.top()));
    m_ToRemove.pop();
   };

   if(m_Delegates.size())
   std::for_each(m_Delegates.begin(),
        m_Delegates.end(),
        [&_args](const DelegateDecl& _decl) { (*_decl)(_args); });
  }; // eo raise

  DelegateDecl addListener(DelegateDecl _decl)
  {
   boost::mutex::scoped_lock lock(m_Mutex);
   m_Delegates.insert(_decl);
   return _decl;
  }; // eo addListener

  DelegateDecl addListener(DelegateType _f)
  {
   DelegateDecl ret(bindDelegate(_f));
   return addListener(ret);
  }; // eo addListener


  void removeListener(const DelegateDecl _decl)
  {
   boost::mutex::scoped_lock lock(m_Mutex);
   DelegateSet_cit cit(m_Delegates.find(_decl));
   if(cit != m_Delegates.end())
    m_ToRemove.push(_decl);
  }; // eo removeListener

  // operators

  // Only use operator += if you don't which to manually detach using removeListener
  Event& operator += (DelegateType _f)
  {
   addListener(_f);
   return *this;
  }; // eo op +=

 }; // eo class Event

Then, it was time to create a socket class. Below is the header:

socket.h

(Some notes: ByteVector is typedef std::vector<unsigned char>)

#pragma once

#include "event.h"

// boost
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/buffer.hpp>
  // class Socket
  class MORSE_API Socket : boost::noncopyable
  {
  protected:
   typedef boost::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;

  private:
   ByteVector      m_Buffer;   // will be used to read in

   SocketPtr        m_SocketPtr;
   boost::asio::ip::tcp::endpoint      m_RemoteEndPoint;
   bool         m_bConnected;

   // reader
   void _handleConnect(const boost::system::error_code& _errorCode, boost::asio::ip::tcp::resolver_iterator _rit);
   void _handleRead(const boost::system::error_code& _errorCode, std::size_t read);
  protected:

   SocketPtr socket() { return m_SocketPtr; };
  public:
   Socket(ByteVector_sz _bufSize = 512);
   virtual ~Socket();

   // properties
   bool isConnected() const { return m_bConnected; };
   const boost::asio::ip::tcp::endpoint& remoteEndPoint() const {return m_RemoteEndPoint; };

   // methods
   void connect(boost::asio::ip::tcp::resolver_iterator _rit);
   void connect(const String& _host, const Port _port);
   void close();

   // Events
   Event onRead;
   Event onResolve;
   Event onConnect;
   Event onClose;
  }; // eo class Socket

And, now the implementation. You'll notice it calls another class to perform DNS resolution. I will show that afterwards. Also there are some EventArg-derivatives I have ommitted. They are simply passed as EventArg parameters when socket events occur.

socket.cpp

#include "socket.h"


// boost
#include <boost/asio/placeholders.hpp>

namespace morse
{
 namespace net
 {
  // ctor
  Socket::Socket(ByteVector_sz _bufSize /* = 512 */) : m_bConnected(false)
  {
   m_Buffer.resize(_bufSize);
  }; // eo ctor

  // dtor
  Socket::~Socket()
  {
  }; // eo dtor


  // _handleRead
  void Socket::_handleRead(const boost::system::error_code& _errorCode,
            std::size_t _read)
  {
   if(!_errorCode)
   {
    if(_read)
    {
     onRead.raise(SocketReadEventArgs(*this, m_Buffer, _read));
     // read again
     m_SocketPtr->async_read_some(boost::asio::buffer(m_Buffer), boost::bind(&Socket::_handleRead, this, _1, _2));
    };
   }
   else
    close();
  }; // eo _handleRead


  // _handleConnect
  void Socket::_handleConnect(const boost::system::error_code& _errorCode,
         boost::asio::ip::tcp::resolver_iterator _rit)
  {
   m_bConnected = !_errorCode;
   bool _raise(false);
   if(!_errorCode)
   {
    m_RemoteEndPoint = *_rit;
    _raise = true;
    m_SocketPtr->async_read_some(boost::asio::buffer(m_Buffer), boost::bind(&Socket::_handleRead, this, _1, _2));
   }
   else if(++_rit != boost::asio::ip::tcp::resolver::iterator())
   {
    m_SocketPtr->close();
    m_SocketPtr->async_connect(*_rit, boost::bind(&Socket::_handleConnect, this, boost::asio::placeholders::error, _rit));
   }
   else
    _raise = true; // raise complete failure

   if(_raise)
    onConnect.raise(SocketConnectEventArgs(*this, _errorCode));

  }; // eo _handleConnect


  // connect
  void Socket::connect(boost::asio::ip::tcp::resolver_iterator _rit)
  {
   boost::asio::ip::tcp::endpoint ep(*_rit);
   m_SocketPtr.reset(new boost::asio::ip::tcp::socket(Root::instance().ioService()));
   m_SocketPtr->async_connect(ep, boost::bind(&Socket::_handleConnect, this, boost::asio::placeholders::error, _rit));
  };


  void Socket::connect(const String& _host, Port _port)
  {
   // Anon function for resolution of the host-name and asynchronous calling of the above
   auto anonResolve = [this](const boost::system::error_code& _errorCode, 
           boost::asio::ip::tcp::resolver_iterator _epIt)
   {
    // raise event
    onResolve.raise(SocketResolveEventArgs(*this, !_errorCode ? (*_epIt).host_name() : String(""), _errorCode));

    // perform connect, calling back to anonymous function
    if(!_errorCode)
     this->connect(_epIt);
   };

   // Resolve the host calling back to anonymous function
   Root::instance().resolveHost(_host, _port, anonResolve);

  }; // eo connect


  void Socket::close()
  {
   if(m_bConnected)
   {
    onClose.raise(SocketCloseEventArgs(*this));
    m_SocketPtr->close();
    m_bConnected = false;
   };
  } // eo close

As I said about DNS resolution, the line Root::instance().resolveHost(_host, _port, anonResolve); calls this to perform asynchronous DNS:

  // resolve a host asynchronously
  template<typename ResolveHandler>
  void resolveHost(const String& _host, Port _port, ResolveHandler _handler)
  {
   boost::asio::ip::tcp::endpoint ret;
   boost::asio::ip::tcp::resolver::query query(_host, boost::lexical_cast<std::string>(_port));
   m_Resolver.async_resolve(query, _handler);
  }; // eo resolveHost

Finally, I need a text-based socket that raised an event every time I line was received (which is then processed). I'll omit the header file this time and just show the implementation file. Needless to say it declares an Event called onLine which it fires every time a line is received in it's entirety:

// boost
#include <boost/asio/buffer.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/placeholders.hpp>

namespace morse
{
 namespace net
 {
  String TextSocket::m_DefaultEOL("\r\n");

  // ctor
  TextSocket::TextSocket() : m_EOL(m_DefaultEOL)
  {
   onRead += boost::bind(&TextSocket::readHandler, this, _1);
  }; // eo ctor


  // dtor
  TextSocket::~TextSocket()
  {
  }; // eo dtor


  // readHandler
  void TextSocket::readHandler(const EventArgs& _args)
  {
   auto& args(static_cast<const SocketReadEventArgs&>(_args));
   m_LineBuffer.append(args.buffer().begin(), args.buffer().begin() + args.bytesRead());
   String::size_type pos;
   while((pos = m_LineBuffer.find(eol())) != String::npos)
   {
    onLine.raise(SocketLineEventArgs(*this, m_LineBuffer.substr(0, pos)));
    m_LineBuffer = m_LineBuffer.substr(pos + eol().length());
   };
  }; // eo readHandler


  // writeHandler
  void TextSocket::writeHandler(const boost::system::error_code& _errorCode, std::size_t _written)
  {
   if(!_errorCode)
   {
    m_Queue.pop_front();
    if(!m_Queue.empty()) // more to do?
     boost::asio::async_write(*socket().get(), boost::asio::buffer(m_Queue.front(), m_Queue.front().length()), boost::bind(&TextSocket::writeHandler, this, _1, _2));
   }
   else
    close();
  }; // eo writeHandler

  void TextSocket::sendLine(String _line)
  {
   Root::instance().ioService().post(boost::bind(&TextSocket::_sendLine, this, _line));
  }; // eo sendLine


  // _sendLine
  void TextSocket::_sendLine(String _line)
  {
   // copy'n'queue
   _line.append(m_EOL);
   m_Queue.push_back(_line);
   if(m_Queue.size() == 1) // previously an empty queue, must start write!
    boost::asio::async_write(*socket().get(), boost::asio::buffer(m_Queue.front(), m_Queue.front().length()), boost::bind(&TextSocket::writeHandler, this, _1, _2));
  }; // eo sendLine

Some things to note about the class above... it uses boost::asio::post to send lines. This allows it to all occur on the threads that ASIO manages in a thread-safe way, and allows us to queue up lines to be sent as and when. This makes it very scalable.

I am sure there are many more questions and maybe my code isn't helpful. I spent several days piecing it all together and making sense of it, and I doubt it's actually any good. hopefully some better minds will glance over it and go "HOLY CRAP, THIS

≈。彩虹 2024-10-20 15:35:34

我不确定您是否需要“重度”多线程。大多数高速应用程序使用操作系统的轮询机制,其扩展性通常比线程更好。

该架构在很大程度上取决于您的应用程序需要的反应程度,即哪些组件负责生成输入和输出以及进行实际处理。

使用 boost::asio 解决问题的一种方法是拥有一个运行 boost::asio::io_service::run 方法的通信线程。 io_service 监听各种 UDP 套接字,并在消息到达时对其进行处理,可能会将它们发送到队列中,以便应用程序可以在主线程中处理它们。从主线程,您可以将消息发布到 io_services,以便主系统发送它们。

这应该可以让您毫无困难地达到每秒 2000 条消息。

另一种方法是通过从多个线程多次调用 boost::asio::io_service::run 方法来启动多个通信线程,从而允许其通信线程并行处理消息。

不过,对 Asio 的一点建议是:由于其异步架构,如果您遵循其逻辑并按照其预期的方式使用它,它会工作得更好。如果您发现自己使用了大量的锁并管理了许多线程,那么您可能做错了。仔细查看各种方法的线程安全保证,并研究提供的示例。

I'm not sure you need to go "heavy" multi-thread. Most high speed applications use the polling mechanisms of the operating system, which generally scale better than threads.

The architecture will depend a lot on how reactive your application needs to be, in terms of what components are responsible for generating inputs and outputs, and doing the actual processing.

A way to approach your problem using boost::asio would be to have a communication thread that runs the boost::asio::io_service::run method. The io_service listens on the various UDP sockets, and processes messages as they arrive, possibly sending them down a queue so that the application can process them in the main thread. From the main thread, you can post messages to the io_services for them to be sent by the main system.

This should allow you to climb up to 2000 messages per seconds without much difficulties.

An alternative would be to start several communication threads by calling the boost::asio::io_service::run method several times from several threads, allowing for messages to be processed in parallel by their communication thread.

One word of advice with Asio, though: because of its asynchronous architecture, it works better if you go within its logic and use it the way it is meant to. If you find out that you are using a lot of locks and managing many threads yourself, then you are probably doing it wrong. Look closely at the thread safety guarantees of the various methods, and study the provided examples.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文