提升ASIO超时方法

发布于 2025-01-31 18:28:36 字数 3170 浏览 2 评论 0原文

我有需要在100毫秒内收集100多个客户的数据。在那段时间之后,我需要处理收集的数据。完成过程后,需要重新启动我从客户端收集数据的位置,等等。

要收集我正在使用当前实施的数据:

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>

#include <boost/date_time/posix_time/posix_time.hpp>

#include <iostream>
#include <list>
#include <set>

namespace net = boost::asio;
using net::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::placeholders;

struct listener {

    using Buffer = std::array<char, 100>; // receiver buffer
    udp::socket s;

    listener(net::any_io_executor ex, uint16_t port) : s{ex, {{}, port}} {}

    void start() {
        read_loop(error_code{}, -1); // prime the async pump
    }

    void stop() {
        post(s.get_executor(), [this] { s.cancel(); });
    }

    void report() const {
        std::cout << s.local_endpoint() << ": A total of " << received_packets
                  << " were received from " << unique_senders.size()
                  << " unique senders\n";
    }

private:
    Buffer receive_buffer;
    udp::endpoint         sender;

    std::set<udp::endpoint> unique_senders;
    size_t                  received_packets = 0;


    void read_loop(error_code ec, size_t bytes) {
        if (bytes != size_t(-1)) {
            // std::cout << "read_loop (" << ec.message() << ")\n";
            if (ec)
                return;

            received_packets += 1;
            unique_senders.insert(sender);
             std::cout << "Received:" << bytes << " sender:" << sender << " recorded:"
             << received_packets << "\n";
             //std::cout <<
            // std::string_view(receive_buffer.data(), bytes) << "\n";
        }
        s.async_receive_from(net::buffer(receive_buffer), sender,
                             std::bind_front(&listener::read_loop, this));
    };
};

int main() {
    net::thread_pool io(1); // single threaded

    using Timer = net::steady_timer;
    using TimePoint = std::chrono::steady_clock::time_point;
    using Clock = std::chrono::steady_clock;

    Timer timer_(io);
    std::list<listener> listeners;

    auto each = [&](auto mf) { for (auto& l : listeners) (l.*mf)(); };

    for (uint16_t port : {1234, 1235, 1236})
        listeners.emplace_back(io.get_executor(), port);

    each(&listener::start);

    TimePoint startTP = Clock::now();
    timer_.expires_at(startTP + 100ms); // collect data for 100 ms
    timer_.async_wait([&](auto &&){each(&listener::stop);});

    std::cout << "Done ! \n";
    each(&listener::report);
    

    io.join();
}

可以停止收集过程的方法吗?

TimePoint startTP = Clock::now();
timer_.expires_at(startTP + 100ms); // collect data for 100 ms
timer_.async_wait([&](auto &&){each(&listener::stop);});

enter image description here

I have situation where I need to collect data from more than 100 clients in 100 ms. After that time I need to process collected data. When process is done, need to restart step where I am collecting data from the clients and so on in the loop.

To collect the data I am using the current implementation :

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>

#include <boost/date_time/posix_time/posix_time.hpp>

#include <iostream>
#include <list>
#include <set>

namespace net = boost::asio;
using net::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::placeholders;

struct listener {

    using Buffer = std::array<char, 100>; // receiver buffer
    udp::socket s;

    listener(net::any_io_executor ex, uint16_t port) : s{ex, {{}, port}} {}

    void start() {
        read_loop(error_code{}, -1); // prime the async pump
    }

    void stop() {
        post(s.get_executor(), [this] { s.cancel(); });
    }

    void report() const {
        std::cout << s.local_endpoint() << ": A total of " << received_packets
                  << " were received from " << unique_senders.size()
                  << " unique senders\n";
    }

private:
    Buffer receive_buffer;
    udp::endpoint         sender;

    std::set<udp::endpoint> unique_senders;
    size_t                  received_packets = 0;


    void read_loop(error_code ec, size_t bytes) {
        if (bytes != size_t(-1)) {
            // std::cout << "read_loop (" << ec.message() << ")\n";
            if (ec)
                return;

            received_packets += 1;
            unique_senders.insert(sender);
             std::cout << "Received:" << bytes << " sender:" << sender << " recorded:"
             << received_packets << "\n";
             //std::cout <<
            // std::string_view(receive_buffer.data(), bytes) << "\n";
        }
        s.async_receive_from(net::buffer(receive_buffer), sender,
                             std::bind_front(&listener::read_loop, this));
    };
};

int main() {
    net::thread_pool io(1); // single threaded

    using Timer = net::steady_timer;
    using TimePoint = std::chrono::steady_clock::time_point;
    using Clock = std::chrono::steady_clock;

    Timer timer_(io);
    std::list<listener> listeners;

    auto each = [&](auto mf) { for (auto& l : listeners) (l.*mf)(); };

    for (uint16_t port : {1234, 1235, 1236})
        listeners.emplace_back(io.get_executor(), port);

    each(&listener::start);

    TimePoint startTP = Clock::now();
    timer_.expires_at(startTP + 100ms); // collect data for 100 ms
    timer_.async_wait([&](auto &&){each(&listener::stop);});

    std::cout << "Done ! \n";
    each(&listener::report);
    

    io.join();
}

Is it okay approach to stop collecting process ?

TimePoint startTP = Clock::now();
timer_.expires_at(startTP + 100ms); // collect data for 100 ms
timer_.async_wait([&](auto &&){each(&listener::stop);});

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

皇甫轩 2025-02-07 18:28:36

我将其解释为基本上是在询问如何组合

​-no-incoming-data/72015194?noredirect = 1#comment127835948_72015194“> :

我有一个瓶颈。根据最后的代码,您与我共享的内容:[...]我尝试添加条件:记录数据的100毫秒和恢复插座后,转到处理收集的数据。完成后,重新开始100毫秒以从插座收集数据,然后再次处理900毫秒等...问题是每个侦听器现在都有自己的当前时间。我正在考虑如何将所有内容都放在一个地方,并且当经过100毫秒时,请通知所有“听众”以使用“ stop()函数由您提供的stop()函数”。

使用我在第一个(单个列表)示例中使用的相同时板计算似乎容易得多。

我计算时间切片的方式的全部要点是允许在没有时间段的情况下将同步到一个时钟。它的优点在于,它在多上名单上翻译了1:1。

这是每个侦听器与1个计时器的组合,但同步的时间切片,以完全相同的方式创建了我从原始答案代码中创建多上述式示例的方式:

(仅复制与Read_loop相关的所有内容,并完成)


in Coliru

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <list>
#include <set>

namespace net = boost::asio;
using net::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::placeholders;

using Clock = std::chrono::steady_clock;
using Timer = net::steady_timer;
constexpr auto period      = 1s;
constexpr auto armed_slice = 100ms;

struct listener {
    udp::socket s;

    listener(Clock::time_point start, net::any_io_executor ex, uint16_t port)
        : s{ex, {{}, port}}
        , start_{start} {}

    void start() {
        read_loop(event::init, error_code{}, -1); // prime the async pump
    }

    void stop() {
        post(s.get_executor(), [this] {
            stopped_ = true;
            s.cancel();
            timer.cancel();
        });
    }

    void report() const {
        std::cout << s.local_endpoint() << ": A total of " << received_packets
                  << " were received from " << unique_senders.size()
                  << " unique senders\n";
    }

  private:
    std::atomic_bool stopped_{false};
    enum class event { init, resume, receive };

    Clock::time_point const start_;
    Timer                   timer{s.get_executor()};
    std::array<char, 100>   receive_buffer;
    udp::endpoint           sender;

    std::set<udp::endpoint> unique_senders;
    size_t                  received_packets = 0;

    void read_loop(event ev, error_code ec, [[maybe_unused]] size_t bytes) {
        if (stopped_)
            return;
        auto const now    = Clock::now();
        auto const relnow = now - start_;
        switch (ev) {
        case event::receive:
            // std::cout << s.local_endpoint() << "receive (" << ec.message()
            //<< ")\n";
            if (ec)
                return;

            if ((relnow % period) > armed_slice) {
                // ignore this receive

                // wait for next slice
                auto next_slice = start_ + period * (relnow / period + 1);
                std::cout << s.local_endpoint() << " Waiting "
                          << (next_slice - now) / 1ms << "ms ("
                          << received_packets << " received)\n";
                timer.expires_at(next_slice);
                return timer.async_wait(std::bind(&listener::read_loop, this,
                                                  event::resume, _1, 0));
            } else {
                received_packets += 1;
                unique_senders.insert(sender);
                /*
                 *std::cout << s.local_endpoint() << " Received:" << bytes
                 *          << " sender:" << sender
                 *          << " recorded:" << received_packets << "\n";
                 *std::cout << std::string_view(receive_buffer.data(), bytes)
                 *          << "\n";
                 */
            }
            break;
        case event::resume:
            //std::cout << "resume (" << ec.message() << ")\n";
            if (ec)
                return;
            break;
        case event::init:
            //std::cout << s.local_endpoint() << " init " << (now - start_) / 1ms << "ms\n";
            break;
        };
        s.async_receive_from(
            net::buffer(receive_buffer), sender,
            std::bind_front(&listener::read_loop, this, event::receive));
    }
};

int main() {
    net::thread_pool io(1); // single threaded

    std::list<listener> listeners;

    auto each = [&](auto mf) { for (auto& l : listeners) (l.*mf)(); };

    auto const start = Clock::now();

    for (uint16_t port : {1234, 1235, 1236})
        listeners.emplace_back(start, io.get_executor(), port);

    each(&listener::start);

    // after 5s stop
    std::this_thread::sleep_for(5s);

    each(&listener::stop);

    io.join();
    
    each(&listener::report);
}

live

href =“ https://i.sstatic.net/g3ucu.gif” rel =“ nofollow noreferrer”>

编辑以防输出太快而无法解释:

0.0.0.0:1234 Waiting 899ms (1587 received)
0.0.0.0:1236 Waiting 899ms (1966 received)
0.0.0.0:1235 Waiting 899ms (1933 received)
0.0.0.0:1235 Waiting 899ms (4054 received)
0.0.0.0:1234 Waiting 899ms (3454 received)
0.0.0.0:1236 Waiting 899ms (4245 received)
0.0.0.0:1236 Waiting 899ms (6581 received)
0.0.0.0:1235 Waiting 899ms (6257 received)
0.0.0.0:1234 Waiting 899ms (5499 received)
0.0.0.0:1235 Waiting 899ms (8535 received)
0.0.0.0:1234 Waiting 899ms (7494 received)
0.0.0.0:1236 Waiting 899ms (8811 received)
0.0.0.0:1236 Waiting 899ms (11048 received)
0.0.0.0:1234 Waiting 899ms (9397 received)
0.0.0.0:1235 Waiting 899ms (10626 received)
0.0.0.0:1234: A total of 9402 were received from 7932 unique senders
0.0.0.0:1235: A total of 10630 were received from 8877 unique senders
0.0.0.0:1236: A total of 11053 were received from 9133 unique senders

如果您是确定您仍保持单线螺纹,则可以考虑使用相同的实际计时器,以显着提高复杂性的成本。

I'm interpreting this as basically asking how to combine

This is also reflected in your comment there:

I have one bottleneck. According to last code what you shared with me : [...] I tried to add the condition : record 100 ms of the data and after resume the sockets, go to process collected data. When is done, start again 100 ms to collect data from sockets and again process for 900 ms etc... The problem is that each listener now have its own current time. I am thinking how to have everything in one place, and when 100 ms is elapsed, notify all 'listeners' to resume using "stop() function provided by you".

It would seem much easier to use the same time-slice calculation I used in the first (single-listener) example.

The whole point of the way I calculated time slices was to allow for synchronization to a clock, without time-drift. The beauty of it is that it translates 1:1 on multi-listeners.

Here's the combination with 1 timer per listener but synchronized time slices, created in exactly the same way I created the multi-listener sample from the original answer code:

(just copied all the things related to read_loop into a class, and done)

Live On Coliru

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <list>
#include <set>

namespace net = boost::asio;
using net::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::placeholders;

using Clock = std::chrono::steady_clock;
using Timer = net::steady_timer;
constexpr auto period      = 1s;
constexpr auto armed_slice = 100ms;

struct listener {
    udp::socket s;

    listener(Clock::time_point start, net::any_io_executor ex, uint16_t port)
        : s{ex, {{}, port}}
        , start_{start} {}

    void start() {
        read_loop(event::init, error_code{}, -1); // prime the async pump
    }

    void stop() {
        post(s.get_executor(), [this] {
            stopped_ = true;
            s.cancel();
            timer.cancel();
        });
    }

    void report() const {
        std::cout << s.local_endpoint() << ": A total of " << received_packets
                  << " were received from " << unique_senders.size()
                  << " unique senders\n";
    }

  private:
    std::atomic_bool stopped_{false};
    enum class event { init, resume, receive };

    Clock::time_point const start_;
    Timer                   timer{s.get_executor()};
    std::array<char, 100>   receive_buffer;
    udp::endpoint           sender;

    std::set<udp::endpoint> unique_senders;
    size_t                  received_packets = 0;

    void read_loop(event ev, error_code ec, [[maybe_unused]] size_t bytes) {
        if (stopped_)
            return;
        auto const now    = Clock::now();
        auto const relnow = now - start_;
        switch (ev) {
        case event::receive:
            // std::cout << s.local_endpoint() << "receive (" << ec.message()
            //<< ")\n";
            if (ec)
                return;

            if ((relnow % period) > armed_slice) {
                // ignore this receive

                // wait for next slice
                auto next_slice = start_ + period * (relnow / period + 1);
                std::cout << s.local_endpoint() << " Waiting "
                          << (next_slice - now) / 1ms << "ms ("
                          << received_packets << " received)\n";
                timer.expires_at(next_slice);
                return timer.async_wait(std::bind(&listener::read_loop, this,
                                                  event::resume, _1, 0));
            } else {
                received_packets += 1;
                unique_senders.insert(sender);
                /*
                 *std::cout << s.local_endpoint() << " Received:" << bytes
                 *          << " sender:" << sender
                 *          << " recorded:" << received_packets << "\n";
                 *std::cout << std::string_view(receive_buffer.data(), bytes)
                 *          << "\n";
                 */
            }
            break;
        case event::resume:
            //std::cout << "resume (" << ec.message() << ")\n";
            if (ec)
                return;
            break;
        case event::init:
            //std::cout << s.local_endpoint() << " init " << (now - start_) / 1ms << "ms\n";
            break;
        };
        s.async_receive_from(
            net::buffer(receive_buffer), sender,
            std::bind_front(&listener::read_loop, this, event::receive));
    }
};

int main() {
    net::thread_pool io(1); // single threaded

    std::list<listener> listeners;

    auto each = [&](auto mf) { for (auto& l : listeners) (l.*mf)(); };

    auto const start = Clock::now();

    for (uint16_t port : {1234, 1235, 1236})
        listeners.emplace_back(start, io.get_executor(), port);

    each(&listener::start);

    // after 5s stop
    std::this_thread::sleep_for(5s);

    each(&listener::stop);

    io.join();
    
    each(&listener::report);
}

Live Demo:

enter image description here

EDIT In case the output goes too fast to interpret:

0.0.0.0:1234 Waiting 899ms (1587 received)
0.0.0.0:1236 Waiting 899ms (1966 received)
0.0.0.0:1235 Waiting 899ms (1933 received)
0.0.0.0:1235 Waiting 899ms (4054 received)
0.0.0.0:1234 Waiting 899ms (3454 received)
0.0.0.0:1236 Waiting 899ms (4245 received)
0.0.0.0:1236 Waiting 899ms (6581 received)
0.0.0.0:1235 Waiting 899ms (6257 received)
0.0.0.0:1234 Waiting 899ms (5499 received)
0.0.0.0:1235 Waiting 899ms (8535 received)
0.0.0.0:1234 Waiting 899ms (7494 received)
0.0.0.0:1236 Waiting 899ms (8811 received)
0.0.0.0:1236 Waiting 899ms (11048 received)
0.0.0.0:1234 Waiting 899ms (9397 received)
0.0.0.0:1235 Waiting 899ms (10626 received)
0.0.0.0:1234: A total of 9402 were received from 7932 unique senders
0.0.0.0:1235: A total of 10630 were received from 8877 unique senders
0.0.0.0:1236: A total of 11053 were received from 9133 unique senders

If you are sure you are remaining single threaded, you might consider using the same actual timer, at the cost of significantly increasing complexity.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文