上一章节介绍了协程的现状,并以libco为例介绍了主流有栈协程的实现原理。这一篇,我们开始进入C++20原生协程的研究。
上篇文章的地址:
这一章节我们会给出,C++20协程的定义,并列举协程需要的所有接口。这一章节会一下涌现很多术语和概念,可能你会感到有些困扰,但不用担心,后续章节会逐一解释各个接口的具体使用。
我们先看下C++20协程的定义。C++20协程标准引入了3个新的关键字, co_await, co_yield, co_return。如果一个函数包含了如上3个关键字之一,则该函数就是一个协程。
除了这3个关键字,实现一个C++20协程还需要实现两个鸭子类型,分别是promise type和awaiter type。
举个例子:对于如下函数some_coroutine,由于在函数体内使用了co_await, 所以在C++20标准下,它就成为一个协程。
T some_coroutine(P param)
{
<declare x>
co_await x;
}
按照编译器的约定,该函数的返回值类型T,必须包含名为promise_type的子类型,且该子类型必须拥有约定的接口。
class T
{
public:
class promise_type
{
public:
<method of convention>
};
};
对于co_await操作数x,可能是如下类型:
1. 鸭子类型awaiter type。
2. 可以通过 T::promise_type::await_transform 接口转换为awaiter type的类型。
3. 第三种鸭子类型,awaitable type(不是awaiter)。
关于上文中提到的三种鸭子类型,我们将相关接口约定列举如下,后续章节会介绍基础接口的使用。
awaiter type需要实现如下名字的函数:
awaitable type需要实现如下的操作符重载:
promise type需要实现如下名字的函数:
promise type可选实现如下名字的函数:
我们先从co_await的语义实现说起。
co_await x;
假设x是我们之前说的awaiter type的变量。我们知道awaiter type有三个必须实现的接口,await_ready, await_suspend, await_resume。
那么co_await的执行过程相当于如下伪代码:(引自参考文献1)
{
if (!awaiter.await_ready())
{
using handle_t = std::experimental::coroutine_handle<P>;
using await_suspend_result_t =
decltype(awaiter.await_suspend(handle_t::from_promise(p)));
<suspend-coroutine>
if constexpr (std::is_void_v<await_suspend_result_t>)
{
awaiter.await_suspend(handle_t::from_promise(p));
<return-to-caller-or-resumer>
}
else
{
static_assert(
std::is_same_v<await_suspend_result_t, bool>,
"await_suspend() must return 'void' or 'bool'.");
if (awaiter.await_suspend(handle_t::from_promise(p)))
{
<return-to-caller-or-resumer>
}
}
<resume-point>
}
return awaiter.await_resume();
}
简单的讲,就是如下步骤
1. 首先调用await_ready判断是否需要执行挂起(异步操作是否已经完成)
2. 然后调用await_suspend
- 如果返回值是void版本的实现,则直接挂起。
- 如果返回值是bool版本的实现,则根据返回值决定是否挂起。
- 如果返回值是coroutine_handle<>版本的实现,挂起并返回到该返回值对应的协程。
3. 当协程唤醒后,会执行await_resume()。其返回值作为(co_await x)表达式的值。
coroutine_handle<>是新出现的一个类型。从名字我们就可以知道,它是协程的句柄。后续在介绍promise type类型时会继续介绍它。
总览部分也提到了co_await操作数x,除了awaiter type,还可能是如下其他类型:
所以对于非awaiter type的x变量,可能经历如下转换步骤(引自参考文献1)。
template<typename P, typename T>
decltype(auto) get_awaitable(P& promise, T&& expr)
{
if constexpr (has_any_await_transform_member_v<P>)
return promise.await_transform(static_cast<T&&>(expr));
else
return static_cast<T&&>(expr);
}
template<typename Awaitable>
decltype(auto) get_awaiter(Awaitable&& awaitable)
{
if constexpr (has_member_operator_co_await_v<Awaitable>)
return static_cast<Awaitable&&>(awaitable).operator co_await();
else if constexpr (has_non_member_operator_co_await_v<Awaitable&&>)
return operator co_await(static_cast<Awaitable&&>(awaitable));
else
return static_cast<Awaitable&&>(awaitable);
}
简单的讲就是:
1. 调用T::promise_type::await_transform,将x作为参数传入,返回新的对象y(如果没有定义该函数,y就是x本身)。
2. 如果上一步得到的y对象的类重载了co_await()运算符,或者有全局的co_await()运算符,则调用该运算符,返回一个awaiter。
上一小节,我们已经介绍了promise type的其中一个接口await_transform。下面我们继续看下promise type其他接口,借此了解协程函数本身的实现细节。
针对上面的协程some_coroutine,以及它的返回值类型T,调用协程的语句可以理解为如下过程 (引自参考文献1)
// Pretend there's a compiler-generated structure called 'coroutine_frame'
// that holds all of the state needed for the coroutine. It's constructor
// takes a copy of parameters and default-constructs a promise object.
struct coroutine_frame { ... };
T some_coroutine(P param)
{
auto* f = new coroutine_frame(std::forward<P>(param));
auto returnObject = f->promise.get_return_object();
// Start execution of the coroutine body by resuming it.
// This call will return when the coroutine gets to the first
// suspend-point or when the coroutine runs to completion.
coroutine_handle<decltype(f->promise)>::from_promise(f->promise).resume();
// Then the return object is returned to the caller.
return returnObject;
}
如果你对之前文章中提到的函数切换,协程切换还有印象的话,作为一个被调用的函数,他需要保存其局部变量的栈帧空间。 对于C++20的原生协程,可以看到,编译器首先会为协程在堆上分配这块空间,我称之为堆帧。堆栈的大小可以认为是,T::promise_type的大小,协程局部变量以及参数的大小累计相加得到的。
另外参数传递部分,通过std::forward的使用,由此我们可以知道对于值传递的变量,会使用了他们的move-constructor。
在协程的堆帧上,会同时创建协程对应的T::promise_type的变量。 然后调用其get_return_object()接口。这个接口负责返回一个T类型的变量。 这里有一点我个人的理解:这里的伪代码只是演示方便,执行过程并不会封装为一个函数,并不会启动新的栈帧,而是在原有栈帧上执行此逻辑。所以协程函数返回的T类型的变量,只是一个临时变量。
这里我们再次看到coroutine_handle。在介绍了堆帧后,我们现在可以说,这个句柄维持了指向协程堆帧的指针。我们可以调用该句柄的resume函数恢复挂起状态协程的执行。
协程本身的执行遵循如下伪代码的流程(引自参考文献1)
{
co_await promise.initial_suspend();
try
{
<body-statements>
}
catch (...)
{
promise.unhandled_exception();
}
FinalSuspend:
co_await promise.final_suspend();
}
2. 然后开始执行我们编写的协程代码。 执行代码过程中,如果遇到了挂起,则会返回到调用者。
3. 最后,无论是否中间经历了挂起,在协程完全结束后,还会调用协程对应的promise变量的final_suspend函数,该函数返回值应可以作为co_await的操作数。这里主要是允许C++20协程的使用者,可以在退出前做适当的处理。
4. 这里还需要实现unhandled_exception(),用于处理协程本身未处理的异常。
除此外,promise type还有一个必须实现的接口,return_void() 或者 return value() 二选一。 在使用co_return时, 会调用你实现的函数,并跳转到FinalSuspend。
至此,我们还剩一个关键字没有解释。在协程内调用co_yield
co_yield <expr>
相当于调用
co_await promise.yield_value(<expr>).
也就是说,对于要支持co_yield的协程,promise_type需要实现yield_value函数,同样的,该函数返回值应可以作为co_await的操作数。
有了以上的理解,那么我们就可以实现一个简单的demo了。
std::coroutine_handle<> g_handle;
struct BaseSwapTestCoro
{
struct awaiter
{
bool await_ready() { return false; }
void await_suspend(std::coroutine_handle<> h) { g_handle = h; }
void await_resume() {}
};
struct promise_type
{
BaseSwapTestCoro get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void unhandled_exception() {}
void return_void() {}
};
};
需要说明的是std::suspend_never是预定义的变量,表明是nerver suspend的awaiter。
测试代码如下
BaseSwapTestCoro SomeFunc()
{
LOG(0, "in coroutine: before await");
co_await BaseSwapTestCoro::awaiter();
LOG(0, "in coroutine: after await");
}
TEST(base, swap)
{
SomeFunc();
LOG(0, "in main: before resume");
g_handle.resume();
LOG(0, "in main: after resume");
}
测试输出如下
[ RUN ] base.swap
base_test.cpp:29|in coroutine: before await
base_test.cpp:37|in main: before resume
base_test.cpp:31|in coroutine: after await
base_test.cpp:39|in main: after resume
[ OK ] base.swap (0 ms)
关于C++20协程实现的基本原理,先介绍到这么多。如果想进一步了解其他可选接口的使用,可以阅读参考资料1。这里需要说明一点,协程的语义并没有改变C++的基本语法规则,比如:
在了解了C++20的实现原理后,我做了协程的基础创建和切换的试验
std::coroutine_handle<> g_handle;
struct BaseSwapTestCoro
{
struct awaiter
{
bool await_ready() { return false; }
void await_suspend(std::coroutine_handle<> h) { g_handle = h; }
int await_resume() { return 1; }
};
struct promise_type
{
BaseSwapTestCoro get_return_object() { return {}; }
awaiter initial_suspend() { return awaiter{}; }
std::suspend_never final_suspend() noexcept { return {}; }
void unhandled_exception() {}
void return_void() {}
auto await_transform() = delete; // no use co_wait
auto yield_value(int) { return awaiter{}; } // how to use void
};
};
TEST(base, swap)
{
base_swap::SomeFunc();
// test
int MAX_LOOP_COUNT = 1000000;
auto begin = CALC_CLOCK_NOW();
for (int i = 0; i < MAX_LOOP_COUNT; i++)
{
base_swap::g_handle.resume();
}
auto end = CALC_CLOCK_NOW(); // 340ns
LOG(0, "cost %lld ps", CALC_PS_AVG_CLOCK(end - begin, MAX_LOOP_COUNT) / 2);
// EXPECT_EQ(g_counter, MAX_LOOP_COUNT);
}
对比libco的方案,有如下数据
方案 | 耗时(单位:皮秒=0.001纳秒) |
---|---|
libco原生实现 | 17,000 ps |
libco opt(参考资料2) | 4,243 ps |
c20上下文切换 | 1,660 ps |
此外, 还得到了c20协程创建开销 1,400 ps。都是2纳秒以内。
看到这个数据还是很令人振奋的。但真正的工作也刚刚开始。
考虑项目内的使用情况,我们往往会将某些协程函数进行封装,这样就会出现某个协程函数等待另一个协程函数的返回。
举个例子,某个RPC请求的响应函数,由于需要请求其他的服务,所以被实现为一个协程A。某些常用的其他服务请求被封装为协程B。A使用B完成部分功能。
假设协程调用过程如下
T B()
{
<co_await service b>
}
T A()
{
B();
<其他同步操作>
<co_await service c>
};
之前有提过,C++20协程是非对称的。如果这样实现的话, 在B函数挂起时, 会返回到A协程的下一条语句继续执行。 且B协程后续唤醒后,执行完成相关逻辑,并不会回到A。而是回到他的唤醒者。如下图所示
而我们想得到的效果是某种对称转移的语义(如果对协程的对称性不了解,可以参见前面的文章)。
上面对称转移到语义就要求我们在协程A中可以 co_await B协程, 等待其执行完成。
T A()
{
co_await B();
<其他同步操作>
<co_await service c>
};
参考Lewiss Baker的第四篇文章(参考资料3),我试着实现了这种对称转移的语义。思路如下 ,针对 co_await B(); 这个语句执行如下步骤:
1. B协程启动后通过initial_suspend立即挂起,并返回对应的T类型对象,此T类型对象保存了B协程句柄。
2. 通过await_transform将T类型对象转换为一个awaiter type,并在其await_suspend函数,通过保存的B协程句柄,在其对应的promise对象中记录他的调用者A。
3. 在B协程被唤醒,执行完后,利用final_suspend,恢复A的执行。
代码较长,放在后续的附件章节。先上测试结果。
如上代码实现,在未引入协程管理的情况下创建效率200ns以上。相比我们之前的协程框架是降低很多的。这里当然可能有实现的原因,但项目内落地的初步结论,不如之前的基础测试理想。
虽然如此,个人认为相较于基于libco的有栈协程,C++20的协程还是有他的优势。
1. 基础性能确实优越。
2. 语言原生支持, 后续可能有高效的对称转移语义的标准库。
3. 堆帧空间可认为不受限制,不用担心爆栈。
作为初步的预研,C++20协程可以总结为,在语言层面实现了一种非对称的无栈协程。作为语言原生支持的协程,基础的效率表现很亮眼。在项目中实际落地,还需要进一步的探索。后续有空闲时间,会继续关注如下三点
1. 如何提高协程的对称转移的效率。
2. 如何提高协程管理的效率。
3. 针对特定框架定制更高效的协程封装。
最后也欢迎各位大牛不吝赐教,各位在C++20协程实际落地过程中的最佳实践。
class Coro
{
public:
Coro() = default; // 同步函数使用
Coro(Coro &&t) noexcept : coro_(std::exchange(t.coro_, {})) {}
Coro(const Coro &t) = delete;
Coro &operator=(Coro &&t) = delete;
Coro &operator=(const Coro &t) = delete;
~Coro();
void Start();
public:
class promise_type;
private:
bool started_ = false; // 已经启动过
explicit Coro(std::coroutine_handle<promise_type> h) noexcept : coro_(h) {}
public:
std::coroutine_handle<promise_type> coro_; // 自身的协程句柄 // TODO: make private
public:
class promise_type
{
public:
promise_type() { id_ = s_id++; }
~promise_type()
{ // LOG(0, "***promise destroy %lld", id_);
}
bool Done() { return done_; }
private:
friend class Coro;
int64_t id_ = 0;
bool done_ = false;
public:
static int64_t s_id; // TODO: make private
public:
Coro get_return_object() noexcept { return Coro(std::coroutine_handle<promise_type>::from_promise(*this)); }
// NOTICE: 所有的协程启动挂起,等待被调用Start,或者被co_await时,恢复执行。
std::suspend_always initial_suspend() noexcept { return {}; }
void return_void() noexcept {}
void unhandled_exception() noexcept { std::terminate(); }
std::suspend_always await_transform(std::suspend_always &&input) { return input; } // 调试用,测基础性能
//-------------------------------------------------------
// call_awaiter 调用方等待对象, 配合后续的final_awaiter用于实现对称转移语义
// A协程等待B协程, co_await B(), B协程返回的Coro对象会被转换为call_awaiter,给A协程等待。
// 虽然call_awaiter对象本身保存协程句柄为B,但call_awaiter::await_suspend的参数为A协程。
//
public:
class call_awaiter
{
public:
friend class Coro;
bool await_ready() noexcept;
std::coroutine_handle<> await_suspend(
std::coroutine_handle<promise_type> continuation) noexcept; // 参数是调用者协程
void await_resume() noexcept {}
private:
explicit call_awaiter(std::coroutine_handle<promise_type> h) noexcept : coro_(h) {}
std::coroutine_handle<promise_type> coro_;
};
call_awaiter await_transform(Coro &&input); // 执行等待子协程操作
//-------------------------------------------------------
// final_awaiter 协程退出是执行的awaiter 配合前面的call_awaiter用于实现对称转移语义
// A协程等待B协程,B协程挂起。在B协程被恢复,并执行完成后,如果没有此处的实现,将返回到恢复者继续执行,而非A协程的挂起点。
// 此处实现,在final_suspend将执行权,恢复回A协程。
struct final_awaiter
{
bool await_ready() noexcept { return false; }
std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> h) noexcept; // 参数时当前协程
void await_resume() noexcept {}
};
final_awaiter final_suspend() noexcept { return {}; }
std::coroutine_handle<promise_type> continuation_; // 调用方的协程句柄
//-------------------------------------------------------------
// callback_awaiter的作用实现 是为了实现挂起后,被唤醒的逻辑
// 在之前的实现中,作为调用层级中的叶子协程,需要调用co_await将自己挂起, 并交由某个管理类管理
// 后续在合适的实际, 由管理类执行唤醒操作
public:
struct callback_awaiter
{
bool await_ready() noexcept { return false; }
int await_resume() noexcept { return 0; }
void await_suspend(std::coroutine_handle<promise_type> h) noexcept; // h 当前协程
};
// NOTICE: 后续可以将i 作为等待时间
callback_awaiter await_transform(int i) { return {}; } // 执行叶子Coro挂起操作
private:
bool need_callback_ = false; // 控制权由Coro转到CoroMgr
bool self_destroy_ = false;
};
};
//---------------------------------------------------
//
Coro::~Coro()
{
// if (coro_ && coro_.promise().Done() && !coro_.promise().need_callback_) // need callback释放控制权
// if (coro_ && coro_.promise().Done()) // need callback释放控制权
if (coro_)
{
if (!coro_.promise().need_callback_) // 未让渡控制权给CoroMgr
{
if (coro_.promise().Done())
{
coro_.destroy();
// LOG(0, "!!destroy %lld", coro_.promise().id_);
}
else
{
coro_.promise().self_destroy_ = true;
}
}
}
}
void Coro::Start()
{
assert(!started_);
if (coro_)
{
coro_.resume();
started_ = true;
}
}
//---------------------------------------------------
//
bool Coro::promise_type::call_awaiter::await_ready() noexcept
{
if (coro_)
return false;
else
return true;
}
std::coroutine_handle<> Coro::promise_type::call_awaiter::await_suspend(
std::coroutine_handle<promise_type> continuation) noexcept // 参数是调用者协程
{
coro_.promise().continuation_ = continuation;
return coro_;
}
//---------------------------------------------------
//
int64_t Coro::promise_type::s_id = 1000;
Coro::promise_type::call_awaiter Coro::promise_type::await_transform(Coro&& input)
{
return call_awaiter(input.coro_);
}
//---------------------------------------------------
//
std::coroutine_handle<> Coro::promise_type::final_awaiter::await_suspend(
std::coroutine_handle<promise_type> h) noexcept // 参数时当前协程
{
// 返回noop_coroutine 表示当前协程被挂起
h.promise().done_ = true;
if (h.promise().self_destroy_)
{
assert(!h.promise().continuation_);
h.destroy();
return std::noop_coroutine();
}
return h.promise().continuation_ ? static_cast<std::coroutine_handle<> >(h.promise().continuation_)
: std::noop_coroutine();
}
//---------------------------------------------------
//
void Coro::promise_type::callback_awaiter::await_suspend(
std::coroutine_handle<promise_type> h) noexcept // 参数时当前协程
{
h.promise().need_callback_ = true;
h.promise().self_destroy_ = false;
CoroMgr::GetInst().Append(h.promise().id_, h.promise().continuation_ ? h.promise().continuation_.promise().id_ : 0, h);
}