boost asio异步等待条件变量

发布于 2024-11-25 17:06:25 字数 137 浏览 4 评论 0原文

是否可以对 boost::asio 中的条件变量执行异步等待(读取:非阻塞)?如果不直接支持任何有关实现的提示,我们将不胜感激。

我可以实现一个计时器,甚至每隔几毫秒就触发一次唤醒,但这种方法要差得多,我发现很难相信条件变量同步没有实现/记录。

Is it possible to perform an asynchronous wait (read : non-blocking) on a conditional variable in boost::asio ? if it isn't directly supported any hints on implementing it would be appreciated.

I could implement a timer and fire a wakeup even every few ms, but this is approach is vastly inferior, I find it hard to believe that condition variable synchronization is not implemented / documented.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

嗳卜坏 2024-12-02 17:06:25

如果我正确理解了意图,那么当某个条件变量发出信号时,您想在 asio 线程池的上下文中启动事件处理程序吗?我认为在处理程序开头等待条件变量就足够了,并且 io_service::post() 本身最终回到池中,类似这样:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
boost::asio::io_service io;
boost::mutex mx;
boost::condition_variable cv;
void handler()
{
    boost::unique_lock<boost::mutex> lk(mx);
         cv.wait(lk);
    std::cout << "handler awakened\n";
    io.post(handler);
}
void buzzer()
{
    for(;;)
    {
        boost::this_thread::sleep(boost::posix_time::seconds(1));
        boost::lock_guard<boost::mutex> lk(mx);
            cv.notify_all();
    }
}
int main()
{
    io.post(handler);
    boost::thread bt(buzzer);
    io.run();
}

If I understand the intent correctly, you want to launch an event handler, when some condition variable is signaled, in context of asio thread pool? I think it would be sufficient to wait on the condition variable in the beginning of the handler, and io_service::post() itself back in the pool in the end, something of this sort:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
boost::asio::io_service io;
boost::mutex mx;
boost::condition_variable cv;
void handler()
{
    boost::unique_lock<boost::mutex> lk(mx);
         cv.wait(lk);
    std::cout << "handler awakened\n";
    io.post(handler);
}
void buzzer()
{
    for(;;)
    {
        boost::this_thread::sleep(boost::posix_time::seconds(1));
        boost::lock_guard<boost::mutex> lk(mx);
            cv.notify_all();
    }
}
int main()
{
    io.post(handler);
    boost::thread bt(buzzer);
    io.run();
}
陌伤ぢ 2024-12-02 17:06:25

我可以建议基于 boost::asio::deadline_timer 的解决方案,它对我来说效果很好。这是 boost::asio 环境中的一种异步事件。
一件非常重要的事情是“handler”必须通过与“cancel”相同的“strand_”进行序列化,因为从多个线程使用“boost::asio::deadline_timer”不是线程安全的。

class async_event
{
public:
    async_event(
        boost::asio::io_service& io_service,
        boost::asio::strand<boost::asio::io_context::executor_type>& strand)
            : strand_(strand)
            , deadline_timer_(io_service, boost::posix_time::ptime(boost::posix_time::pos_infin))
    {}

    // 'handler' must be serialised through the same 'strand_' as 'cancel' or 'cancel_one'
    //  because using 'boost::asio::deadline_timer' from multiple threads is not thread safe
    template<class WaitHandler>
    void async_wait(WaitHandler&& handler) {
        deadline_timer_.async_wait(handler);
    }
    void async_notify_one() {
        boost::asio::post(strand_, boost::bind(&async_event::async_notify_one_serialized, this));
    }
    void async_notify_all() {
        boost::asio::post(strand_, boost::bind(&async_event::async_notify_all_serialized, this));
    }
private:
    void async_notify_one_serialized() {
        deadline_timer_.cancel_one();
    }
    void async_notify_all_serialized() {
        deadline_timer_.cancel();
    }
    boost::asio::strand<boost::asio::io_context::executor_type>& strand_;
    boost::asio::deadline_timer deadline_timer_;
};

I can suggest solution based on boost::asio::deadline_timer which works fine for me. This is kind of async event in boost::asio environment.
One very important thing is that the 'handler' must be serialised through the same 'strand_' as 'cancel', because using 'boost::asio::deadline_timer' from multiple threads is not thread safe.

class async_event
{
public:
    async_event(
        boost::asio::io_service& io_service,
        boost::asio::strand<boost::asio::io_context::executor_type>& strand)
            : strand_(strand)
            , deadline_timer_(io_service, boost::posix_time::ptime(boost::posix_time::pos_infin))
    {}

    // 'handler' must be serialised through the same 'strand_' as 'cancel' or 'cancel_one'
    //  because using 'boost::asio::deadline_timer' from multiple threads is not thread safe
    template<class WaitHandler>
    void async_wait(WaitHandler&& handler) {
        deadline_timer_.async_wait(handler);
    }
    void async_notify_one() {
        boost::asio::post(strand_, boost::bind(&async_event::async_notify_one_serialized, this));
    }
    void async_notify_all() {
        boost::asio::post(strand_, boost::bind(&async_event::async_notify_all_serialized, this));
    }
private:
    void async_notify_one_serialized() {
        deadline_timer_.cancel_one();
    }
    void async_notify_all_serialized() {
        deadline_timer_.cancel();
    }
    boost::asio::strand<boost::asio::io_context::executor_type>& strand_;
    boost::asio::deadline_timer deadline_timer_;
};
眼前雾蒙蒙 2024-12-02 17:06:25

不幸的是,Boost ASIO 没有 async_wait_for_condvar() 方法。

在大多数情况下,您也不需要它。以 ASIO 方式编程通常意味着您使用链,而不是互斥体或条件变量来保护共享资源。除了极少数情况(通常关注启动和退出时的正确构造或销毁顺序)之外,您根本不需要互斥体或条件变量。

修改共享资源时,经典的部分同步线程方式如下:

  • 锁定保护资源的互斥体
  • 更新需要更新的内容
  • 如果需要等待线程进一步处理,则向条件变量发出信号
  • 解锁互斥体

完全异步 ASIO方式是:

  • 生成一条消息,其中包含更新资源所需的所有内容
  • 将该消息发布到更新处理程序的调用到资源的链
  • 如果需要进一步处理,则让该更新处理程序创建更多消息并将它们发布到适当的资源链。
  • 如果作业可以在完全私有的数据上执行,则将它们直接发布到 io-context。

下面是一个类 some_shared_resource 的示例,它接收字符串 state 并根据收到的状态触发一些进一步的处理。请注意,私有方法 some_shared_resource::receive_state() 中的所有处理都是完全线程安全的,因为链会序列化所有调用。

当然,这个例子并不完整; some_other_resource 需要一个与 some_shared_ressource::send_state() 类似的 send_code_red() 方法。

#include <boost/asio>
#include <memory>

using asio_context = boost::asio::io_context;
using asio_executor_type = asio_context::executor_type;
using asio_strand = boost::asio::strand<asio_executor_type>;

class some_other_resource;
class some_shared_resource : public std::enable_shared_from_this<some_shared_resource> {
    asio_strand strand;
    std::shared_ptr<some_other_resource> other;
    std::string state;

    void receive_state(std::string&& new_state) {
        std::string oldstate = std::exchange(state, new_state);
        if(state == "red" && oldstate != "red") {
            // state transition to "red":
            other.send_code_red(true);
        } else if(state != "red" && oldstate == "red") {
            // state transition from "red":
            other.send_code_red(false);
        }
    }

public:
    some_shared_resource(asio_context& ctx, const std::shared_ptr<some_other_resource>& other)
      : strand(ctx.get_executor()), other(other) {}

    void send_state(std::string&& new_state) {
        boost::asio::post(strand, [me = weak_from_this(), new_state = std::move(new_state)]() mutable {
            if(auto self = me.lock(); self) {
                self->receive_state(std::move(new_state));
            }
        });
    }
};

正如您所看到的,一开始总是在 ASIO 的链中发帖可能会有点乏味。但是您可以将大部分“为类配备链”代码移至模板中。

消息传递的好处是:由于您不使用互斥体,因此即使在极端情况下,您也不会再陷入僵局。此外,使用消息传递通常比传统的多线程更容易创建高级别的并行性。不利的一面是,移动和复制所有这些消息对象非常耗时,这可能会减慢应用程序的速度。

最后一点:在 send_state() 形成的消息中使用弱指针有助于可靠地销毁 some_shared_resource 对象:否则,如果 A 调用 B 并且 B 调用 C 和 C调用 A (可能仅在超时或类似之后),在消息中使用共享指针而不是弱指针将创建循环引用,从而防止对象销毁。如果您确定永远不会有循环,并且处理来自要删除对象的消息不会造成问题,则可以使用 shared_from_this() 而不是 weak_from_this( ),当然。如果您确定在 ASIO 停止之前对象不会被删除(并且所有工作线程都连接回主线程),那么您也可以直接捕获 this 指针。

Unfortunately, Boost ASIO doesn't have an async_wait_for_condvar() method.

In most cases, you also won't need it. Programming the ASIO way usually means, that you use strands, not mutexes or condition variables, to protect shared resources. Except for rare cases, which usually focus around correct construction or destruction order at startup and exit, you won't need mutexes or condition variables at all.

When modifying a shared resource, the classic, partially synchronous threaded way is as follows:

  • Lock the mutex protecting the resource
  • Update whatever needs to be updated
  • Signal a condition variable, if further processing by a waiting thread is required
  • Unlock the mutex

The fully asynchronous ASIO way is though:

  • Generate a message, that contains everything, that is needed to update the resource
  • Post a call to an update handler with that message to the resource's strand
  • If further processing is needed, let that update handler create further message(s) and post them to the apropriate resources' strands.
  • If jobs can be executed on fully private data, then post them directly to the io-context instead.

Here is an example of a class some_shared_resource, that receives a string state and triggers some further processing depending on the state received. Please note, that all processing in the private method some_shared_resource::receive_state() is fully thread-safe, as the strand serializes all calls.

Of course, the example is not complete; some_other_resource needs a similiar send_code_red() method as some_shared_ressource::send_state().

#include <boost/asio>
#include <memory>

using asio_context = boost::asio::io_context;
using asio_executor_type = asio_context::executor_type;
using asio_strand = boost::asio::strand<asio_executor_type>;

class some_other_resource;
class some_shared_resource : public std::enable_shared_from_this<some_shared_resource> {
    asio_strand strand;
    std::shared_ptr<some_other_resource> other;
    std::string state;

    void receive_state(std::string&& new_state) {
        std::string oldstate = std::exchange(state, new_state);
        if(state == "red" && oldstate != "red") {
            // state transition to "red":
            other.send_code_red(true);
        } else if(state != "red" && oldstate == "red") {
            // state transition from "red":
            other.send_code_red(false);
        }
    }

public:
    some_shared_resource(asio_context& ctx, const std::shared_ptr<some_other_resource>& other)
      : strand(ctx.get_executor()), other(other) {}

    void send_state(std::string&& new_state) {
        boost::asio::post(strand, [me = weak_from_this(), new_state = std::move(new_state)]() mutable {
            if(auto self = me.lock(); self) {
                self->receive_state(std::move(new_state));
            }
        });
    }
};

As you see, posting always into ASIO's strands can be a bit tedious at first. But you can move most of that "equip a class with a strand" code into a template.

The good thing about message passing: As you are not using mutexes, you cannot deadlock yourself anymore, even in extreme situations. Also, using message passing, it is often easier to create a high level of parallelity than with classical multithreading. On the downside, moving and copying around all these message objects is time consuming, which can slow down your application.

A last note: Using the weak pointer in the message formed by send_state() facilitates the reliable destruction of some_shared_resource objects: Otherwise, if A calls B and B calls C and C calls A (possibly only after a timeout or similiar), using shared pointers instead of weak pointers in the messages would create cyclic references, which then prevents object destruction. If you are sure, that you never will have cycles, and that processing messages from to-be-deleted objects doesn't pose a problem, you can use shared_from_this() instead of weak_from_this(), of course. If you are sure, that objects won't get deleted before ASIO has been stopped (and all working threads been joined back to the main thread), then you can also directly capture the this pointer instead.

桃酥萝莉 2024-12-02 17:06:25

FWIW,我使用相当好的 continuable 库实现了异步互斥体:

class async_mutex
{
    cti::continuable<> tail_{cti::make_ready_continuable()};
    std::mutex mutex_;

public:
    async_mutex() = default;
    async_mutex(const async_mutex&) = delete;
    const async_mutex& operator=(const async_mutex&) = delete;

    [[nodiscard]] cti::continuable<std::shared_ptr<int>> lock()
    {
        std::shared_ptr<int> result;
        cti::continuable<> tail = cti::make_continuable<void>(
            [&result](auto&& promise) {
                result = std::shared_ptr<int>((int*)1,
                    [promise = std::move(promise)](auto) mutable {
                        promise.set_value();
                    }
                );
            }
        );

        {
            std::lock_guard _{mutex_};
            std::swap(tail, tail_);
        }
        co_await std::move(tail);
        co_return result;
    }
};

用法例如:

async_mutex mutex;

...

{
    const auto _ = co_await mutex.lock();
    // only one lock per mutex-instance
}

FWIW, I implemented an asynchronous mutex using the rather good continuable library:

class async_mutex
{
    cti::continuable<> tail_{cti::make_ready_continuable()};
    std::mutex mutex_;

public:
    async_mutex() = default;
    async_mutex(const async_mutex&) = delete;
    const async_mutex& operator=(const async_mutex&) = delete;

    [[nodiscard]] cti::continuable<std::shared_ptr<int>> lock()
    {
        std::shared_ptr<int> result;
        cti::continuable<> tail = cti::make_continuable<void>(
            [&result](auto&& promise) {
                result = std::shared_ptr<int>((int*)1,
                    [promise = std::move(promise)](auto) mutable {
                        promise.set_value();
                    }
                );
            }
        );

        {
            std::lock_guard _{mutex_};
            std::swap(tail, tail_);
        }
        co_await std::move(tail);
        co_return result;
    }
};

usage eg:

async_mutex mutex;

...

{
    const auto _ = co_await mutex.lock();
    // only one lock per mutex-instance
}

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文