C++ Coroutine 协程 async scope
之前的章节只是讲解了协程中的各种概念,本章我们来实现一个 async scope,即一个可以在其中使用 co_await 和 co_return 的函数,最后我们要在这个基础上实现一个单元素的回调转协程。
Promise 实现
这里我们直接把整个 Scope 建模为一个 Task,先看看整体的样式
Task<int> simple_task2() {
std::cout << __func__ << "\n";
co_return 2;
}
Task<int> simple_task1() {
std::cout << __func__ << "\n";
co_return 1;
}
Task<int> simple_task() {
// result2 == 2
auto result2 = co_await simple_task1();
std::cout << __func__ << " co_await task1 \n";
// result3 == 3
auto result3 = co_await simple_task2();
std::cout << __func__ << " co_await task2 \n";
co_return 1 + result2 + result3;
}
这里可以推断出两个要做的事情
- co_await 对应的 await_transform 允许接受 Task 作为参数并返回一个 Awaiter
- co_return 对应的 return_value 允许接受 T 来作为整个任务的结果值
先考虑一个单独异步任务,即我们最常见的那种 Promise/Future 模型,promise 负责支持 complete 触发回调,future 负责挂载回调,那么我们就可以这样考虑:
promise::complete 的实现就是 return_value 的实现
void return_value(ResultType value) {
done = true;
res = Result<ResultType>(std::move(value));
for (auto &callback : completion_callbacks) {
callback(res);
}
}
那么下一步的设计就该考虑 completion_callbacks 放在哪里了
首先按照习惯性设计外部的 Future(这里是 C++的协程概念中的 future,就是 promise 外面那个)是会持有当前的 coroutine_handle 的通过这个很容易拿到 promise,但是 promise 是不知道 future 的,所以我们直接把 completion_callbacks 放在 promise 内部即可。
这里跟 co_await 实现倒是没什么关系。。。只是给一个外部可以给 task 挂载回调的机制罢了
Task &finally(std::function<void()> &&func) {
handle.promise().on_completed([func](auto result) { func(); });
return *this;
}
接下来就是暂存结果值和实现三大件了
都很简单
struct promise_type {
auto initial_suspend() { return std::suspend_never{}; }
auto final_suspend() noexcept { return std::suspend_always{}; }
auto unhandled_exception() {
done = true;
res = Result<ResultType>(std::current_exception());
for (auto &callback : completion_callbacks) {
callback(res);
}
}
// 定制 co_return 行为 当调用这个方法时意味着此时
void return_value(ResultType value) {
done = true;
res = Result<ResultType>(std::move(value));
for (auto &callback : completion_callbacks) {
callback(res);
}
}
Task<ResultType> get_return_object() {
return Task{std::coroutine_handle<promise_type>::from_promise(*this)};
}
template <typename _ResultType>
TaskAwaiter<_ResultType> await_transform(Task<_ResultType> &&task) {
return TaskAwaiter<_ResultType>(std::move(task));
}
void on_completed(std::function<void(Result<ResultType>)> &&func) {
if (done) {
// result 已经有值
auto value = res;
// 解锁之后再调用 func
func(value);
} else {
// 否则添加回调函数,等待调用
completion_callbacks.push_back(func);
}
}
auto get_result() { return res.get_or_throw(); }
public:
Result<ResultType> res;
bool done = false;
std::list<std::function<void(Result<ResultType>)>> completion_callbacks;
};
Awaitable
剩下的就是 awaitable 实现,这个主要用于 co_await 这个操作符的返回值,此时我们需要根据其右侧的状态来决定是否挂起以及返回值了,就是等待子任务·
我们再看一眼这个函数的签名,注意 Task 是个 C++的 Future 哦
template <typename _ResultType>
TaskAwaiter<_ResultType> await_transform(Task<_ResultType> &&task) {
return TaskAwaiter<_ResultType>(std::move(task));
}
直接看代码吧 很简单
//因为 task 是 future 所以我们有个 handle 指针再获取到 promise 再获取到 done 变量状态
bool await_ready() const noexcept {
return task.handle.promise().done;
}
//注意这个 handle 代指的是当前 continuation,是调用方的句柄,当我 resume 时,等价于从 co_await 处向下执行 而 task 则是代指的子任务
void await_suspend(std::coroutine_handle<> handle) noexcept {
// 当 task 执行完之后调用 resume
task.finally([handle]() { handle.resume(); });
}
// 协程恢复执行时,被等待的子 Task 已经执行完,调用 get_result 来获取结果
R await_resume() noexcept { return task.get_result(); }
简单的一个 async scope 就完成了
回调转协程
这个就更简单啦
我们要实现的结构类似于 js 中的
fn(args,(i) => {fun1(i)});
fun1(await fn(args))
状态储存
这个就有点像我们 Vert.x 的 Promise/Future 实现了
template <typename T> struct AsyncResult {
explicit AsyncResult() = default;
// 当 Task 正常返回时用结果初始化 Result
explicit AsyncResult(T &&value)
: res(value),
completion_callbacks(std::move(value.completion_callbacks)) {}
// 这里先暂时一把大锁控制一下 好理解
void complete(Result<T> &&async_value) {
std::cout << __func__ << "AsyncResult 准备触发回调 \n";
auto scope = std::lock_guard(queue_lock);
done = true;
res = async_value;
for (auto fn : completion_callbacks) {
fn(res);
}
completion_callbacks.clear();
}
void on_completed(std::function<void(Result<T>)> &&func) {
auto scope = std::lock_guard(queue_lock);
if (done) {
// result 已经有值
auto value = res;
// 解锁之后再调用 func
func(value);
} else {
// 否则添加回调函数,等待调用
completion_callbacks.push_back(func);
}
}
bool is_done() { return done; }
Result<T> get_result() { return res; }
private:
std::mutex queue_lock;
std::list<std::function<void(Result<T>)>> completion_callbacks;
Result<T> res;
bool done = false;
};
awaitable
既然要允许 await 那就得把之前实现的 AsyncResult 添加点 awaitable 实现
这里我们采用组合的方式来做
这里是一个多所有权的场景 直接用 share_ptr 把这个玩意让协程和异步任务共同持有
我思考了一下,这里因为存在一定的顺序性 raw ptr 也是可行的
异步任务持有指针不释放
等待 awaitable 析构时自然释放即可
template <typename T> struct AsyncResultAwaiter {
//省略构造参数
bool await_ready() const noexcept { return res->is_done(); }
void await_suspend(std::coroutine_handle<> handle) noexcept {
// 当 task 执行完之后调用 resume
res->on_completed([handle](auto a) {
std::cout << __func__ << "AsyncResultAwaiter 回调 \n";
std::cout << handle.address();
handle.resume();
});
}
Result<T> await_resume() noexcept {
return res.get()->get_result();
}
std::shared_ptr<AsyncResult<T>> res;
std::coroutine_handle<> handle;
};
这里就相当于 kt 的那种做法,直接拿到 coroutine_handle 来用
最后我们在之前实现的 task 的 promise 中添加对它的支持就行了
这里的参数名为 future 的参数就是与异步任务共享的那个回调,这里只是为了能拿到协程句柄丢到对应的异步任务回调中罢了
template <typename _ResultType>
AsyncResultAwaiter<_ResultType>
await_transform(std::shared_ptr<AsyncResult<_ResultType>> future) {
return AsyncResultAwaiter(
future, std::coroutine_handle<promise_type>::from_promise(*this));
}
用法
Task<int> async_task() {
// result2 == 2
auto shared_ptr = std::make_shared<AsyncResult<int>>();
auto thread = std::thread([shared_ptr]() {
std::cout << "thread :in lambda \n";
std::this_thread::sleep_for(std::chrono::seconds(1));
shared_ptr->complete(Result(12));
});
thread.detach();
std::cout << "before await count:" << shared_ptr.use_count() << "\n";
auto res = co_await shared_ptr;
std::cout << "res:" << res.get_or_throw() << "\n";
co_return res.get_or_throw() + 3;
}
总结
这里面我只是实现了 而非打磨好了 其中还有一些内存所有权的问题还没有解决甚至存在内存泄漏,但是先学会再打磨,完整代码参考 codepieces/async_scope_and_future.cpp at main · dreamlike-ocean/codepieces · GitHub
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

上一篇: Java 响应式的锁
下一篇: Jvm 常量池
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论