async
概述
- 用于简化异步任务执行的高级抽象
- 它的核心目标是让你能方便地启动一个任务并在未来获取其结果,而无需直接管理线程的生命周期、同步等底层细节
std::async可以看作是std::promise,std::packaged_task和std::thread的高级封装
策略
std::launch::async- 异步执行:立即(或尽快)在新线程中运行
- 创建新线程或利用线程池
- 需要真正并发、计算密集型或独立的耗时任务
- 行为明确,确定为异步
std::launch::deferred- 同步延迟执行:仅在调用
get()或wait()时,在调用线程中运行 - 不使用新线程,任务在请求结果的线程上运行
- 不要求并行,可能永不执行或希望惰性求值的任务
- 行为明确,确定为延迟同步
- 同步延迟执行:仅在调用
核心机制:共享状态
- 概述
std::async的强大功能依赖于一个关键的内部数据结构:共享状态(Shared State)
- 这个共享状态是一个线程安全的上下文,主要用于存储:
- 任务的执行状态:标识任务是否已完成、正在进行中或被延迟
- 任务的返回值:任务执行完毕后的结果
- 任务抛出的异常:如果任务中抛出了异常,异常对象会被捕获并存储于此
原理
- 调用
std::async时,它会进行一系列幕后操作:- 创建一个共享状态
- 将你提交的任务(函数、函数对象等)及其参数打包
通常内部会使用std::packaged_task进行封装,后者能自动将结果或异常存入共享状态 - 根据指定的启动策略,决定如何执行这个打包好的任务
- 返回一个
std::future对象给你
std::future对象- 这个
std::future对象就像是通往共享状态的唯一句柄或钥匙 - 当你调用
future.get()时,当前线程会阻塞,直到异步任务执行完毕 - 然后,
get()方法会从共享状态中取出结果(或重新抛出存储的异常)
- 这个
- 需要注意的是,
get()方法只能调用一次,因为它会转移或消耗掉共享状态中的值
future
概述
核心机制:共享状态
std::future机制的核心- 它通常包含:
- 存储的值或异常:任务的计算结果或执行过程中抛出的异常
- 就绪状态标志:指示结果/异常是否已经准备好
- 同步原语:如互斥锁(
mutex)和条件变量(condition variable),用于管理线程间的等待和通知
线程同步与阻塞
- 调用
future.get()时,会发生以下情况- 检查共享状态是否已经就绪
- 如果已经就绪,立即返回存储的值或抛出存储的异常
- 如果尚未就绪,当前调用线程会被阻塞。内部实现通常会使用条件变量让线程等待
- 当
Provider(如std::promise::set_value)设置了值或异常后,共享状态被标记为就绪,并通知(notify)等待的线程
此时,被get()阻塞的线程会被唤醒,继续执行
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
virtual _Ty& _Get_value(bool _Get_only_once) { unique_lock<mutex> _Lock(_Mtx); if (_Get_only_once && _Retrieved) { _Throw_future_error2(future_errc::future_already_retrieved); } if (_Exception) { _STD rethrow_exception(_Exception); } // TRANSITION: `_Retrieved` should be assigned before `_Exception` is thrown so that a `future::get` // that throws a stored exception invalidates the future (N4950 [futures.unique.future]/17) _Retrieved = true; _Maybe_run_deferred_function(_Lock); while (!_Ready) { _Cond.wait(_Lock); } if (_Exception) { _STD rethrow_exception(_Exception); } if constexpr (is_default_constructible_v<_Ty>) { return _Result; } else { return _Result._Held_value; } } |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
// 设置值 void _Set_value(const _Ty& _Val, bool _At_thread_exit) { // store a result unique_lock<mutex> _Lock(_Mtx); _Set_value_raw(_Val, &_Lock, _At_thread_exit); } void _Set_value_raw(const _Ty& _Val, unique_lock<mutex>* _Lock, bool _At_thread_exit) { // store a result while inside a locked block if (_Already_has_stored_result()) { _Throw_future_error2(future_errc::promise_already_satisfied); } _Emplace_result(_Val); _Do_notify(_Lock, _At_thread_exit); } virtual void _Do_notify(unique_lock<mutex>* _Lock, bool _At_thread_exit) { // notify waiting threads // TRANSITION, ABI: This is virtual, but never overridden. if constexpr (is_default_constructible_v<_Ty>) { _Has_stored_result = true; } if (_At_thread_exit) { // notify at thread exit _Cond._Register(*_Lock, &_Ready); } else { // notify immediately _Ready = true; _Cond.notify_all(); } } |
promise
概述
C++11中一个强大的线程间通信工具,它和std::future配合,像一个连接两个线程的单次通信管道,允许数据或异常安全地从“生产者”线程传递到“消费者”线程
组件
std::promise(生产者)- 承诺在未来提供结果的对象
set_value(),set_exception()
std::future(消费者)- 用于获取未来某个时刻的结果的对象
get(),wait()
- 共享状态 (通信桥梁)
- 内部数据结构,存储结果、异常及同步原语
核心机制:共享状态
std::promise和std::future并不直接对话,它们通过一个名为共享状态(Shared State)的内部数据结构进行通信- 这个共享状态是线程安全的,通常包含:
- 存储的值或异常:任务的计算结果或执行过程中抛出的异常
- 就绪状态标志:指示结果/异常是否已经准备好
- 同步原语:如互斥锁(
mutex)和条件变量(condition variable),用于管理线程间的等待和通知
线程同步与阻塞
- 当消费者线程调用
future.get()时,会检查共享状态是否就绪:- 如果已经就绪,则立即返回存储的值或抛出存储的异常
- 如果尚未就绪,当前调用线程会被阻塞。内部实现通常会使用条件变量让线程等待,直到
promise设置结果并发出通知
异常的安全传递
std::promise提供了一个安全的跨线程异常传递通道- 如果异步任务中抛出了未被捕获的异常,可以使用
set_exception将异常指针(std::exception_ptr)存储到共享状态中 - 当消费者线程调用
future.get()时,这个异常会在当前线程中被重新抛出,使得跨线程错误处理与本地异常处理一样自然
- 如果异步任务中抛出了未被捕获的异常,可以使用
注意
- 一个
std::promise只能设置一次值或异常。多次调用set_value会导致未定义行为- 同时,每个
promise只能调用一次get_future()方法,多次调用会抛出std::future_error异常
- 同时,每个
- 如果
std::promise对象在析构前都未设置任何值或异常(即承诺未被履行),那么与之关联的std::future在调用get()时会抛出std::future_error异常,其错误码为std::future_errc::broken_promise std::promise不可复制,但支持移动语义(移动构造函数和移动赋值操作符)std::promise提供了set_value_at_thread_exit函数- 与
set_value立即通知等待线程不同,它会在当前线程退出、所有线程局部存储对象被销毁后,才将共享状态标记为就绪 - 这可以避免某些情况下,等待线程因立即被唤醒而访问到尚未完全清理的线程局部资源
- 与
packaged_task
概述
C++11中一个非常实用的异步编程工具,它充当了可调用任务与未来结果之间的“绑定器”
组成
- 被包装的任务 (
Stored Task)- 存储用户提供的可调用对象(如函数、
lambda表达式),定义了需要异步执行的逻辑
- 存储用户提供的可调用对象(如函数、
- 共享状态 (
Shared State)- 一个线程安全的内部数据结构,作为中间媒介,用于存储任务执行后的返回值或抛出的异常
std::future关联接口- 通过
get_future()方法产生一个与共享状态绑定的std::future对象,是获取结果的唯一凭证
- 通过
核心机制
- 任务包装与对象创建
- 当你创建一个
std::packaged_task对象时,需要指定一个函数签名(例如int(int, int))并将一个可调用对象(如函数、lambda表达式)传递给它 - 此时,
packaged_task内部会存储这个任务,并初始化一个空的共享状态
- 当你创建一个
|
1 2 |
// 包装一个返回 int、接受两个int参数的 lambda 任务 std::packaged_task<int(int, int)> task([](int a, int b) { return a + b; }); |
- 获取未来凭证
- 通过调用
task.get_future()方法,你可以获得一个与该任务共享状态关联的std::future对象 - 一个
packaged_task只能调用一次get_future()
- 通过调用
|
1 |
std::future<int> result_future = task.get_future(); // 获取 future |
- 执行任务与结果传递
- 可以像调用普通函数一样调用
task(),或者在新的线程中执行它(由于packaged_task不可复制,传递到线程需要使用std::move) - 当任务被调用执行后,其返回值会被自动捕获并存储到共享状态中
- 如果任务执行过程中抛出了异常,该异常也会被捕获并存储到共享状态中,而不是立即终止程序
- 无论哪种情况,共享状态都会被标记为 就绪
- 可以像调用普通函数一样调用
- 获取结果
- 在需要结果的地方,调用
result_future.get()。这个操作会: - 如果共享状态已就绪,立即返回存储的值或重新抛出存储的异常
- 如果共享状态未就绪,则阻塞当前线程,直到任务执行完成
- 在需要结果的地方,调用
注意
std::packaged_task禁用了拷贝构造函数和拷贝赋值操作,但支持移动语义- 必须确保
std::packaged_task对象最终被执行。如果它在其生命周期内从未被调用(即承诺未履行),那么与之关联的std::future在调用get()时会抛出std::future_error异常,错误码为broken_promise - 一个
std::packaged_task对象通常只能执行一次- 如果需要再次执行相同逻辑的任务,需要创建新的对象
- 或者使用其
reset()方法(该方法会重置共享状态,但保留被包装的任务)
其他
promise对比async普通return返回
- 普通返回(
return)的时候,一定是线程结束的时候 promise通过set_value返回状态的时候,不需要是线程结束的时机,还可以在set_value之后继续处理其他任务
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
// 通过promise多阶段状态返回 #include <iostream> #include <future> #include <thread> #include <chrono> #include <vector> #include <random> // 模拟一个复杂的多阶段计算任务 void complexCalculation(std::promise<int>&& phase1_promise, std::promise<double>&& phase2_promise, std::promise<std::vector<int>>&& phase3_promise) { std::cout << "[计算线程] 开始复杂计算...\n"; // 第一阶段:数据准备和初步处理 std::cout << "[计算线程] 第一阶段进行中...\n"; std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时操作 int phase1_result = 1000; // 假设的初步计算结果 phase1_promise.set_value(phase1_result); // 设置第一阶段结果 std::cout << "[计算线程] 第一阶段完成,结果已设置。继续第二阶段...\n"; // 第二阶段:核心算法计算 std::cout << "[计算线程] 第二阶段进行中...\n"; std::this_thread::sleep_for(std::chrono::seconds(3)); double phase2_result = 42.5; // 假设的核心计算结果 phase2_promise.set_value(phase2_result); // 设置第二阶段结果 std::cout << "[计算线程] 第二阶段完成,结果已设置。继续第三阶段...\n"; // 第三阶段:结果整理和输出准备 std::cout << "[计算线程] 第三阶段进行中...\n"; std::this_thread::sleep_for(std::chrono::seconds(2)); std::vector<int> phase3_result = {1, 2, 3, 4, 5}; // 假设的最终结果集 phase3_promise.set_value(phase3_result); // 设置第三阶段结果 std::cout << "[计算线程] 第三阶段完成,所有计算结束。\n"; } int main() { // 为三个阶段分别创建promise和future std::promise<int> promise_phase1; std::future<int> future_phase1 = promise_phase1.get_future(); std::promise<double> promise_phase2; std::future<double> future_phase2 = promise_phase2.get_future(); std::promise<std::vector<int>> promise_phase3; std::future<std::vector<int>> future_phase3 = promise_phase3.get_future(); // 启动计算线程,将promise移动给新线程 std::thread calculation_thread(complexCalculation, std::move(promise_phase1), std::move(promise_phase2), std::move(promise_phase3)); // 主线程可以同时进行其他工作 std::cout << "[主线程] 计算任务已启动,我可以继续处理其他工作...\n"; // 等待并处理第一阶段的結果 int result1 = future_phase1.get(); // 会阻塞直到第一阶段完成 std::cout << "[主线程] 收到第一阶段结果: " << result1 << ",开始基于此结果进行预处理...\n"; // 模拟基于第一阶段结果的预处理 std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 等待并处理第二阶段的結果 double result2 = future_phase2.get(); // 会阻塞直到第二阶段完成 std::cout << "[主线程] 收到第二阶段结果: " << result2 << ",进行进一步分析...\n"; // 模拟进一步分析 std::this_thread::sleep_for(std::chrono::milliseconds(800)); // 等待并处理第三阶段的最终結果 std::vector<int> final_result = future_phase3.get(); std::cout << "[主线程] 收到最终结果,包含 " << final_result.size() << " 个元素: "; for (int val : final_result) { std::cout << val << " "; } std::cout << "\n[主线程] 所有阶段处理完成!\n"; calculation_thread.join(); return 0; } |
async,packaged_task,promise总结
async- 封装了
thread,自管理生命周期 - 两种模式,异步模式或延迟同步模式
- 内部会自动捕获返回值或异常,存储到
future里面 - 通过
future获取结果,阻塞(任务没完成)或不阻塞(任务已完成)当前线程
- 封装了
packaged_task- 封装可调用对象
- 内置
promise,自动捕获返回值或异常后通过set_value把返回值或异常存储到future - 通过
future获取结果,阻塞(任务没完成)或不阻塞(任务已完成)当前线程
promise- 随便一个时机都可以调用
set_value,不需要像普通返回那样return完线程就结束了 - 而且必须要调用
set_value - 通过
future获取结果,阻塞(另一线程还未调用set_value)或不阻塞(另一线程已调用set_value)当前线程
- 随便一个时机都可以调用
声明:本文为原创文章,版权归Aet所有,欢迎分享本文,转载请保留出处!
你可能也喜欢
- ♥ 打包_7z生成自解压打包exe07/11
- ♥ C++11_第五篇12/08
- ♥ STL_slist08/28
- ♥ C++编程规范101规则、准则与最佳实践 二01/07
- ♥ C++并发编程 _管理线程05/07
- ♥ breakpad记述:Windows下静态库的编译使用03/15