协程
概述
C++20
协程(Coroutines
)是一种无栈协程实现,通过挂起/恢复机制简化异步编程,将回调式代码转化为顺序式结构- 使用
C++20
协程时,必须实现promise
类型(协程的行为控制器)和可选的awaitable
对象(挂起/恢复行为控制器)
机制
- 无栈设计
- 协程状态存储在堆上(协程帧),而非调用栈,节省内存且切换开销低
- 关键字
co_await
:挂起协程,等待异步操作完成(如I/O
请求)co_yield
:生成值并挂起,用于序列生成器(如迭代器)co_return
:结束协程并返回值
- 执行流程
- 协程通过多次挂起/恢复实现协作式多任务,由用户态调度,避免内核线程切换开销
核心组件
- 协程帧
- 作用:存储协程状态,包括局部变量、参数、挂起点位置等
- 生命周期:在首次调用时创建,结束时销毁(需手动管理内存)
promise
类型(定义协程行为,需实现以下接口:)get_return_object()
:返回给调用者的对象(如任务句柄)initial_suspend()
/final_suspend()
:控制协程开始/结束时是否挂起(返回std::suspend_always
或suspend_never
)return_value(T)
/return_void()
:处理co_return
返回值yield_value(T)
:支持co_yield
的值传递unhandled_exception()
:异常处理逻辑
- 协程句柄(
std::coroutine_handle
)- 操作协程的生命周期:
resume()
:恢复挂起的协程destroy()
:销毁协程帧done()
:检查协程是否结束promise()
:获取关联的promise
对象
Awaitable
接口(co_await
的操作对象需实现:)await_ready()
:返回false
时触发挂起await_suspend(handle)
:挂起时执行(可调度其他任务)await_resume()
:恢复时返回结果- 内置类型:
std::suspend_always
:总是挂起
std::suspend_never
:不挂起
必须实现的函数及结构
Promise
类型必须实现的函数
函数签名 | 调用时机 | 必选/可选 | 典型实现 |
get_return_object() |
协程开始执行前 | 必须 | 返回给用户的协程句柄封装对象 |
initial_suspend() |
协程初始执行前 | 必须 | 返回 suspend_always (挂起) 或 suspend_never (不挂起) |
final_suspend() |
co_return 执行后 |
必须 | 通常返回 suspend_always 以便获取结果 |
unhandled_exception() |
协程内抛出未捕获异常时 | 必须 | 处理异常(如记录日志、设置异常状态) |
return_void() 或 |
co_return; 调用时 |
二者选一 | 处理无返回值返回 |
return_value(T) |
co_return value; 调用时 |
二者选一 | 处理有返回值返回 |
yield_value(T) |
co_yield value; 调用时 |
可选 | 生成器场景必须实现 |
Awaitable
对象必须实现的函数
函数签名 | 作用 |
bool await_ready() const |
是否无需挂起(true=立即执行) |
void await_suspend(coro_handle) |
挂起时执行的逻辑(通常安排恢复机制) |
T await_resume() |
恢复时返回结果(T可为void) |
Promise
与 Awaitable
的关系与区别
promise
(协程的"大脑")- 控制协程的生命周期(初始/最终挂起)
- 管理返回值/异常
- 定义协程的返回类型
Awaitable
对象(挂起点的"控制器")- 定义
co_await
的具体行为 - 管理挂起期间的异步操作
- 决定何时/如何恢复协程
- 定义
- 场景对比
特征 | Promise 类型 |
Awaitable 对象 |
实现位置 | 与协程返回类型关联 | 独立类型 |
必要接口 | 5个必须实现的函数 | 3个必须实现的函数 |
调用方 | 编译器自动调用 | 通过 co_await 显式调用 |
典型应用 | 定义协程基本行为 | 实现特定挂起/恢复逻辑 |
示例 | 控制协程初始挂起、异常处理 | 网络I/O、定时器、锁等待 |
- 流程与交互
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
+-------------------+ +-----------------+ | 创建协程对象 | | co_await 表达式 | | (调用get_return_object)| | (需要awaitable) | +-------------------+ +-----------------+ | | v v +-------------------+ +-----------------+ | Promise 初始设置 | | 挂起当前协程 | | (initial_suspend) | | (await_suspend) | +-------------------+ +-----------------+ | | v v +-------------------+ +-----------------+ | 执行协程体 |---->| 外部恢复 | | (可能多次co_await)| | (handle.resume())| +-------------------+ +-----------------+ | ^ v | +-------------------+ +-----------------+ | Promise 清理 | | 恢复后处理 | | (final_suspend) | | (await_resume) | +-------------------+ +-----------------+ |
promise
类型实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
struct GeneratorPromise { int current_value; // 存储yield的值 // 必须实现的接口 Generator get_return_object() { return Generator(std::coroutine_handle<GeneratorPromise>::from_promise(*this)); } std::suspend_always initial_suspend() noexcept { return {}; } std::suspend_always final_suspend() noexcept { return {}; } void unhandled_exception() { std::terminate(); } // 支持co_yield std::suspend_always yield_value(int value) { current_value = value; return {}; } // 支持co_return void return_void() {} }; |
co_await
- 挂起当前协程,等待异步任务完成后恢复协程
co_yield
- 概述
- 是协程中实现"生成器模式"的关键机制
- 允许协程在返回一个值后暂停执行,后续需要时可以恢复执行并产生下一个值
- 关键行为
- 当协程执行到
co_yield
时: - 暂停协程(当前执行位置被保存)
- 返回指定值 给调用者
- 等待调用者的"获取下一个值"请求
- 再次被调用时从上次暂停的位置恢复执行
- 当协程执行到
- 示例文件处理
- 可以理解为:协程函数
read_large_file
返回了一个生成器,每次调用next
的时候该生成器会返回数据,然后暂停协程,直到下一次被请求(也就是下一次next
被调用)
- 可以理解为:协程函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
Generator<Chunk> read_large_file(const std::string& path) { std::ifstream file(path, std::ios::binary); if (!file) co_return; char buffer[4096]; while (file.read(buffer, sizeof(buffer))) { co_yield Chunk(buffer, file.gcount()); // 返回一个数据块 } } // 使用 auto file_reader = read_large_file("huge.data"); while (auto chunk = file_reader.next()) { process_chunk(*chunk); } |
- 上述文件处理方式和常规的读取文件的方式(也是在一个循环中,读取完一块再读取下一块数据)有什么不一样的?
- 控制流模式不一样
传统方式的循环是:读取器主动推送数据给处理器
协程生成器的方式是:处理器按需从读取器拉取数据 - 资源管理差异
传统方式的循环是:文件在读取期间全程保持打开
传统方式的循环是:整个文件处理过程是原子操作
传统方式的循环是:无法中途暂停释放资源
协程生成器方式是:协程暂停时不会自动关闭文件,需要显式管理 - 并发能力
传统方式的循环是:传统循环无法实现这种单读取器多处理器的并行架构
协程生成器方式是:协程生成器的方式可以,见下面示例代码 - 处理流程灵活性
传统方式的循环是:传统循环无法实现这种分段处理
协程生成器方式是:协程生成器的方式可以做到,见下面示例代码
- 控制流模式不一样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
// 创建多个并行处理器 auto reader = read_large_file("big.data"); auto processor1 = [](auto& reader) { while (auto chunk = reader.next()) { if (chunk->id % 2 == 0) process_even(*chunk); } }; auto processor2 = [](auto& reader) { while (auto chunk = reader.next()) { if (chunk->id % 2 != 0) process_odd(*chunk); } }; // 并行处理(需要线程安全实现) std::thread t1(processor1, std::ref(reader)); std::thread t2(processor2, std::ref(reader)); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
// 分段处理 auto reader = read_large_file("big.data"); // 只处理前100个块 for (int i = 0; i < 100; ++i) { if (auto chunk = reader.next()) { process_chunk(*chunk); } } // 暂停一段时间 std::this_thread::sleep_for(10s); // 继续处理剩余部分 while (auto chunk = reader.next()) { process_chunk(*chunk); } |
问题
- 挂起协程
- 协程挂起就是暂停执行的意思
- 协程在特定点停止执行,保存当前完整状态,并在稍后从完全相同的状态恢复执行
- 为什么要挂起协程
- 等待异步操作
- 资源暂时不可用
- 主动让出控制权
- 构建生成器
- 调用协程函数就是启动了
C++20
协程吗- 调用协程函数就是启动了
C++20
协程的生命周期,但具体是否会立即执行函数体代码,取决于在协程设计中是否在启动点设置了挂起
- 调用协程函数就是启动了
- 对协程的类比理解:
- 相当于启动了一个线程执行异步任务,并且无回调,然后对该线程调用了
detatch
- 不会阻塞主线程(协程也不阻塞主线程)
- 无回调,该线程自动结束(协程也自动结束)
- 如果要等待异步任务结束,要在主程序结束之前对线程进行等待(协程也需要等待)
- 相当于启动了一个线程执行异步任务,并且无回调,然后对该线程调用了
co_await
和co_yield
- 见上文相关小节
- 多个协程函数在一个线程上怎么并行
- 在单线程上运行多个协程时,它们并非真正的并行(
parallel
),而是通过协作式多任务(cooperative multitasking
)实现的并发(concurrency
)
- 在单线程上运行多个协程时,它们并非真正的并行(
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
class Scheduler { queue<coroutine_handle<>> ready_queue; public: // 添加协程到调度队列 void schedule(coroutine_handle<> h) { ready_queue.push(h); } // 单线程调度循环 void run() { while (!ready_queue.empty()) { auto h = ready_queue.front(); ready_queue.pop(); if (!h.done()) { h.resume(); // 恢复执行协程 // 如果协程未完成,重新加入队列 if (!h.done()) { ready_queue.push(h); } } } } }; struct Task { struct promise_type { Scheduler* scheduler = nullptr; auto get_return_object() { return Task{coroutine_handle<promise_type>::from_promise(*this)}; } auto initial_suspend() { return suspend_always{}; } auto final_suspend() noexcept { return suspend_always{}; } void unhandled_exception() { terminate(); } void return_void() {} }; coroutine_handle<promise_type> handle; // 恢复协程执行 void resume() { if (!handle.done()) handle.resume(); } }; Task coroutine_work(Scheduler& sch, int id, int steps) { // 关联调度器 handle.promise().scheduler = &sch; for (int i = 0; i < steps; ++i) { cout << "协程" << id << " 步骤" << i << endl; co_await suspend_always{}; // 主动让出执行权 } } int main() { Scheduler sch; // 创建多个协程任务 auto task1 = coroutine_work(sch, 1, 3); auto task2 = coroutine_work(sch, 2, 5); auto task3 = coroutine_work(sch, 3, 2); // 将协程加入调度器 sch.schedule(task1.handle); sch.schedule(task2.handle); sch.schedule(task3.handle); // 启动调度循环 sch.run(); } |
1 2 3 4 5 6 7 8 9 10 |
协程1 步骤0 协程2 步骤0 协程3 步骤0 协程1 步骤1 协程2 步骤1 协程3 步骤1 协程1 步骤2 协程2 步骤2 协程2 步骤3 协程2 步骤4 |
总结
- 要使用
C++20
的协程,就得实现一个自定义的promise
类型(该类型有5个必要的成员函数必须实现)作为协程函数的返回类型 - 要在这个协程函数中对挂起点逻辑进行控制,就得实现一个自定义的
awaitable
类型(该类型有3个必要的成员函数必须实现),用于在协程函数中使用 - 另外,要用一个包装类包含一个名为
promise_type
的嵌套类型,用promise_type
将这个包装类和自定义promise
类型关联起来
结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
// 1. 自定义Awaitable类型 class TimerAwaitable { public: // 构造器:接收等待时间 explicit TimerAwaitable(std::chrono::milliseconds duration) : duration_(duration) {} // 1. 是否准备好(无需等待) bool await_ready() const noexcept { return false; // 总是需要挂起 } // 2. 挂起协程时执行的操作 void await_suspend(std::coroutine_handle<> h) noexcept { // 在新线程中执行定时器 timer_thread_ = std::thread([h, this] { // 模拟IO等待 std::this_thread::sleep_for(duration_); // 定时器结束后恢复协程 if (!h.done()) h.resume(); }); } // 3. 协程恢复时执行的操作 void await_resume() noexcept { if (timer_thread_.joinable()) { timer_thread_.join(); // 等待定时器线程结束 } std::cout << "恢复后处理: " << duration_.count() << "ms 已过\n"; } private: std::chrono::milliseconds duration_; std::thread timer_thread_; }; // 2. 定义自定义promise类型 struct MyPromise { // 必须的5个函数 auto get_return_object() { return std::coroutine_handle<MyPromise>::from_promise(*this); } auto initial_suspend() { return std::suspend_always{}; } // 初始挂起 auto final_suspend() noexcept { return std::suspend_always{}; } void unhandled_exception() { std::terminate(); } void return_void() {} // 可选:支持yield操作(不是必须的,但这里展示扩展) auto yield_value(const char* msg) { std::cout << "Yield处理: " << msg << "\n"; return std::suspend_always{}; } }; // 3. 定义协程包装类 class CoroutineTask { public: // 关键:定义promise_type using promise_type = MyPromise; // 从promise创建协程句柄 CoroutineTask(std::coroutine_handle<promise_type> h) : handle_(h) {} ~CoroutineTask() { if (handle_) handle_.destroy(); } // 恢复协程执行 void resume() { if (handle_ && !handle_.done()) handle_.resume(); } // 检查协程状态 bool is_done() const { return handle_.done(); } private: std::coroutine_handle<promise_type> handle_; }; // 4. 定义协程函数(使用自定义Awaitable) CoroutineTask example_coroutine() { std::cout << "协程开始执行 (阶段1)\n"; // 使用自定义Awaitable - 等待500ms co_await TimerAwaitable{500ms}; std::cout << "协程继续执行 (阶段2)\n"; // 使用内置Awaitable co_await std::suspend_always{}; std::cout << "协程继续执行 (阶段3)\n"; // 使用yield_value(需要promise中定义) co_yield "准备下一阶段"; std::cout << "协程继续执行 (阶段4)\n"; // 使用自定义Awaitable - 等待200ms co_await TimerAwaitable{200ms}; std::cout << "协程即将结束\n"; } // 5. 使用包装类操作协程 int main() { // 创建协程(初始挂起) CoroutineTask task = example_coroutine(); std::cout << "协程已创建,但尚未执行 (初始挂起)\n"; // 第一次恢复 - 会触发TimerAwaitable std::cout << "\n第一次恢复操作:\n"; task.resume(); // 协程现在在第一个TimerAwaitable处挂起(在另一个线程中计时) std::cout << "主线程继续执行 - 等待500ms...\n"; std::this_thread::sleep_for(700ms); // 等待超时结束 // 第二次恢复 std::cout << "\n第二次恢复操作:\n"; task.resume(); // 继续执行到 suspend_always 后的挂起点 // 第三次恢复(恢复yield后的执行) std::cout << "\n第三次恢复操作:\n"; task.resume(); // 激活yield后的代码 // 协程现在在第二个TimerAwaitable处挂起 std::cout << "主线程继续执行 - 等待200ms...\n"; std::this_thread::sleep_for(300ms); // 等待超时结束 // 第四次恢复(完成协程) std::cout << "\n第四次恢复操作:\n"; task.resume(); std::cout << "\n协程完成状态: " << task.is_done() << "\n"; } |
协程:应用
协程句柄操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// 自定义协程销毁器 struct CoroutineDeleter { void operator()(std::coroutine_handle<> h) { if (h && !h.done()) h.destroy(); } }; // 安全句柄包装 using SafeHandle = std::unique_ptr< std::coroutine_handle<>, CoroutineDeleter >; // 跨协程恢复 void resume_cross_context(std::coroutine_handle<> h) { if (!h.done()) { // 将恢复操作提交到其他线程的执行器 thread_pool::submit([h] { h.resume(); }); } } |
awaitable
高级实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
// 异步文件读取Awaitable class AsyncFileRead { int fd; char* buffer; size_t size; ssize_t result; bool ready = false; public: AsyncFileRead(int fd, char* buf, size_t len) : fd(fd), buffer(buf), size(len) {} bool await_ready() noexcept { return false; // 总是挂起 } void await_suspend(std::coroutine_handle<> h) { // 提交异步IO请求 io_uring_prep_read(&sqe, fd, buffer, size, 0); io_uring_set_data(&sqe, h.address()); // 设置完成回调 auto callback = [](void* data) { auto handle = std::coroutine_handle<>::from_address(data); if (handle) handle.resume(); }; io_uring_submit_and_wait(&ring, 1, callback); } ssize_t await_resume() noexcept { return result; } }; |
任务调度系统实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
class Scheduler { std::queue<std::coroutine_handle<>> ready_queue; std::mutex queue_mutex; std::condition_variable cv; bool stopped = false; public: // 提交协程任务 void schedule(std::coroutine_handle<> task) { std::lock_guard lock(queue_mutex); ready_queue.push(task); cv.notify_one(); } // 运行调度循环 void run() { while (true) { std::coroutine_handle<> task; { std::unique_lock lock(queue_mutex); cv.wait(lock, [&]{ return stopped || !ready_queue.empty(); }); if (stopped && ready_queue.empty()) return; task = ready_queue.front(); ready_queue.pop(); } if (!task.done()) task.resume(); } } // 停止调度器 void stop() { std::lock_guard lock(queue_mutex); stopped = true; cv.notify_all(); } }; |
带超时的awaitable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
template <typename Awaitable> class TimeoutAwaitable { Awaitable awaitable; std::chrono::milliseconds timeout; public: TimeoutAwaitable(Awaitable a, std::chrono::milliseconds t) : awaitable(std::move(a)), timeout(t) {} bool await_ready() { return awaitable.await_ready(); } void await_suspend(std::coroutine_handle<> h) { // 启动原始awaitable awaitable.await_suspend(h); // 设置超时定时器 timer_thread = std::thread([h, this] { std::this_thread::sleep_for(timeout); if (!h.done()) { // 超时后恢复协程并抛出异常 h.resume(); } }); } auto await_resume() { timer_thread.join(); if (timed_out) throw std::runtime_error("Operation timed out"); return awaitable.await_resume(); } private: std::thread timer_thread; bool timed_out = false; }; |
协程同步原语:协程锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
class CoroutineMutex { bool locked = false; std::queue<std::coroutine_handle<>> waiters; public: struct LockAwaiter { CoroutineMutex& mutex; bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle<> h) noexcept { if (!mutex.try_lock()) { mutex.waiters.push(h); } } void await_resume() noexcept {} }; LockAwaiter lock() noexcept { return LockAwaiter{*this}; } void unlock() { if (!waiters.empty()) { auto next = waiters.front(); waiters.pop(); next.resume(); } else { locked = false; } } private: bool try_lock() { if (!locked) { locked = true; return true; } return false; } }; |
HTTP
服务器协程化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
Task<void> handle_connection(Socket socket) { try { while (true) { // 异步读取请求 auto request = co_await socket.async_read(); // 处理请求 auto response = process_request(request); // 异步写入响应 co_await socket.async_write(response); } } catch (const std::exception& e) { log_error(e.what()); } } void run_server() { Scheduler scheduler; Server server(8080); while (true) { // 异步接受连接 auto socket = co_await server.async_accept(); // 为每个连接创建协程任务 scheduler.schedule(handle_connection(std::move(socket))); } } |
协程管道模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
template <typename T> class Pipeline { std::vector<std::function<T(T)>> stages; public: Pipeline& then(std::function<T(T)> stage) { stages.push_back(std::move(stage)); return *this; } Task<T> execute(T input) { T result = input; for (auto& stage : stages) { // 每个阶段都在协程中异步执行 result = co_await std::async(std::launch::async, stage, result); } co_return result; } }; // 使用示例 auto pipeline = Pipeline<int>{} .then([](int x) { return x * 2; }) .then([](int x) { return x + 10; }) .then([](int x) { return x / 2; }); auto result = co_await pipeline.execute(5); // 结果: (5 * 2+10)/2 = 10 |
协程帧内存池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
class CoroutinePool { static constexpr size_t POOL_SIZE = 1024; std::array<std::byte, POOL_SIZE> memory_pool; std::stack<void*> free_list; public: void* allocate(size_t size) { if (size > POOL_SIZE) return ::operator new(size); if (free_list.empty()) return &memory_pool; auto ptr = free_list.top(); free_list.pop(); return ptr; } void deallocate(void* ptr) { if (ptr >= memory_pool.data() && ptr < memory_pool.data() + POOL_SIZE) { free_list.push(ptr); } else { ::operator delete(ptr); } } }; |
声明:本文为原创文章,版权归Aet所有,欢迎分享本文,转载请保留出处!
你可能也喜欢
- ♥ STL_deque05/18
- ♥ breakpad记述:Windows下动态库的编译使用03/15
- ♥ Spdlog记述:三07/23
- ♥ C++_关于Shared_ptr管理内存05/30
- ♥ Effective C++_第四篇07/02
- ♥ breakpad记述:Windows07/27