现代cpp多线程与并发初探

在现代c++(c++20)中,有了jthread和协程的概念,使得我们编写并发程序更加方便. 这里作简单学习.

前言知识

多线程编程

std::thread 用于创建一个执行的线程实例,所以它是一切并发编程的基础,使用时需要包含 <thread> 头文件, 它提供了很多基本的线程操作,例如 get_id() 来获取所创建线程的线程 ID,使用 join() 来等待这个线程结束(与该线程汇合)等等

std::mutex 是 C++11 中最基本的互斥量类,可以通过构造 std::mutex 对象创建互斥量, 而通过其成员函数 lock() 可以进行上锁,unlock() 可以进行解锁. 但是在实际编写代码的过程中,最好不去直接调用成员函数, 因为调用成员函数就需要在每个临界区的出口处调用 unlock()

std::unique_lock 则是相对于 std::lock_guard 出现的,std::unique_lock 更加灵活, std::unique_lock 的对象会以独占所有权(没有其他的 unique_lock 对象同时拥有某个 mutex 对象的所有权) 的方式管理 mutex 对象上的上锁和解锁的操作.所以在并发编程中,推荐使用 std::unique_lock.

如果用到了条件变量 std::condition_variable::wait 则必须使用 std::unique_lock 作为参数.

条件变量 std::condition_variable 是为了解决死锁而生,当互斥操作不够用而引入的. 比如,线程可能需要等待某个条件为真才能继续执行, 而一个忙等待循环中可能会导致所有其他线程都无法进入临界区使得条件为真时,就会发生死锁. 所以,condition_variable 对象被创建出现主要就是用于唤醒等待线程从而避免死锁. std::condition_variablenotify_one() 用于唤醒一个线程; notify_all() 则是通知所有线程

std::future,它提供了一个访问异步操作结果的途径

试想,如果主线程 A 希望新开辟一个线程 B 去执行某个我们预期的任务,并返回一个结果. 而这时候,线程 A 可能正在忙其他的事情,无暇顾及 B 的结果, 所以我们会很自然的希望能够在某个特定的时间获得线程 B 的结果.

在 C++11 的 std::future 被引入之前,通常的做法是: 创建一个线程 A,在线程 A 里启动任务 B,当准备完毕后发送一个事件,并将结果保存在全局变量中. 而主函数线程 A 里正在做其他的事情,当需要结果的时候,调用一个线程等待函数来获得执行的结果

而 C++11 提供的 std::future 简化了这个流程,可以用来获取异步任务的结果. 自然地,我们很容易能够想象到把它作为一种简单的线程同步手段,即屏障(barrier)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <iostream>
#include <future>
#include <thread>

int main() {
// 将一个返回值为7的 lambda 表达式封装到 task 中
// std::packaged_task 的模板参数为要封装函数的类型
std::packaged_task<int()> task([](){return 7;});
// 获得 task 的期物
std::future<int> result = task.get_future(); // 在一个线程中执行 task
std::thread(std::move(task)).detach();
std::cout << "waiting...";
result.wait(); // 在此设置屏障,阻塞到期物的完成
// 输出执行结果
std::cout << "done!" << std:: endl << "future result is "
<< result.get() << std::endl;
return 0;
}

promise packaged_task async 解决异步

也许你在Python,Js中已经享受到了异步的好处,在c++中也可以,不过可能会稍微麻烦一些.

async

std::async函数可以接受几个不同的启动策略,这些策略在std::launch枚举中定义

使用 std::launch::async 时,std::async 会立即在一个新线程中启动任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <iostream>
#include <future>
#include <chrono>

// 定义一个异步任务
std::string fetchDataFromDB(std::string query) {
// 模拟一个异步任务,比如从数据库中获取数据
std::this_thread::sleep_for(std::chrono::seconds(5));
return "Data: " + query;
}

int main() {
// 使用 std::async 异步调用 fetchDataFromDB
std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data");

// 在主线程中做其他事情
std::cout << "Doing something else..." << std::endl;

// 从 future 对象中获取数据
std::string dbData = resultFromDB.get();
std::cout << dbData << std::endl;

return 0;
}

packaged_task

std::packaged_task是一个可调用目标,它包装了一个任务,该任务可以在另一个线程上运行.它可以捕获任务的返回值或异常,并将其存储在std::future对象中,以便以后使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int my_task() {
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "my task run 5 s" << std::endl;
return 42;
}

void use_package() {
// 创建一个包装了任务的 std::packaged_task 对象
std::packaged_task<int()> task(my_task);

// 获取与任务关联的 std::future 对象
std::future<int> result = task.get_future();

// 在另一个线程上执行任务
std::thread t(std::move(task));
t.detach(); // 将线程与主线程分离,以便主线程可以等待任务完成

// 等待任务完成并获取结果
int value = result.get();
std::cout << "The result is: " << value << std::endl;

}

promise

std::promise用于在某一线程中设置某个值或异常,而std::future则用于在另一线程中获取这个值或异常

1
2
3
4
5
6
7
8
9
void set_value(std::promise<int> prom) { prom.set_value(10); }
int main(){
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread t(set_value, std::move(prom));
int value2 = fut.get();
std::cout << "result" << value2 << std::endl;
t.join();
}

面向c++20

C++11 引入了 std::thread 类型,其与操作系统提供的线程对应,但该类型有一个严重的设计缺陷: 不是 RAII 类型. std::thread 要求在其生命周期结束时,若表示正在运行的线程,则调用 join()(等待线程结束) 或 detach()(让线程在后台运行)

若两者都没有调用,析构函数会立即导致异常的程序终止 (在某些系统上导致段错误).

不能“杀死”已 经启动的线程.线程不是进程,线程只能通过结束自身或结束整个程序来结束. 因此,在调用 join() 之前,应该确保等待的线程将取消其执行.不过,对于 std::thread,没有这 样的机制,必须自己实现取消请求和对它的响应

std::jthread

std::jthread 解决了这些问题,它是 RAII 类型.若线程是可汇入的 (“j”代表“汇入”),析构函数会自动调用 join()

  1. 自动加入std::jthread 在析构时会自动调用 join,确保线程在销毁前完成执行.这避免了 std::thread 在析构时未调用 joindetach 导致的未定义行为.
  2. 停止标记std::jthread 支持停止标记(stop token),允许线程协作地停止执行.这使得线程可以优雅地处理停止请求.
  3. 异常安全std::jthread 在异常处理方面更加安全,确保在异常抛出时线程能够正确地清理资源.

使用 std::jthread 就不再存在导致异常程序终止的危险,也不需要异常处理.为了支持尽可能容易地切换到 std::jthread 类,该类提供了与 std::thread 相同的 API,包括:

  • jthread():默认构造函数,创建一个未关联任何线程的 jthread.
  • jthread(Function&& f, Args&&... args):构造函数,启动一个新的线程来执行给定的函数.
  • join():等待线程完成执行.
  • detach():分离线程,使其独立运行.
  • get_id():获取线程的标识符.
  • joinable():检查线程是否可以加入.
  • swap(jthread& other):交换两个 jthread 对象.
  • request_stop():请求线程停止执行.
  • stop_requested():检查是否请求了停止.
  • get_stop_token():获取线程的停止标记
  • get_stop_source:获取线程停止源

引入停止请求与停止回调

std::jthread 提供了一种协作机制来表示线程不应该再运行.它是“协作的”,因为该机制 不会杀死正在运行的线程 (因为 C++ 线程根本不支持杀死线程,杀死线程的操作可能很容易使程序处于损坏状态).

为了响应停止请求,已启动的线程必须声明std::stop_token作为附加的第一个参数, 并使用它不时的检查是否应该继续运行

1
2
3
4
5
6
void task(std::stop_token st, std::string s, double value) {
while(!st.stop_requested()){ # 检查是否停止了请求

}

} // t.request_sop() 显式请求task()停止执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <thread>
#include <chrono>

void worker(std::stop_token st) {
while (!st.stop_requested()) {
std::cout << "Worker thread is running..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Worker thread is stopping..." << std::endl;
}

int main() {
std::jthread jt(worker);

// 主线程继续执行其他任务
std::cout << "Main thread is doing other work..." << std::endl;

// 模拟一些工作
std::this_thread::sleep_for(std::chrono::seconds(5));

// 请求工作线程停止
jt.request_stop();

// 等待工作线程完成
jt.join();

return 0;
}

还有另一种对停止请求作出反应的方法: 可以为std::stop_token注册回调,该回调将在请求停止时自动调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// std::stop_callback cb{st,[](){}};
#include <iostream>
#include <thread>
#include <chrono>
#include <stop_token>

void worker(std::stop_token st) {
std::stop_callback sc(st, []{
std::cout << "Stop request received, cleaning up..." << std::endl;
});

while (!st.stop_requested()) {
std::cout << "Worker thread is running..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Worker thread is stopping..." << std::endl;
}

int main() {
std::jthread jt(worker);

// 主线程继续执行其他任务
std::cout << "Main thread is doing other work..." << std::endl;

// 模拟一些工作
std::this_thread::sleep_for(std::chrono::seconds(5));

// 请求工作线程停止
jt.request_stop();

// 等待工作线程完成
jt.join();

return 0;
}

std::jthread 的主要优点是会自动建立停止信号的机制.启动线程的构造函数将创建一个停止源,将其存储为线程对象的成员,并将相应的停止令牌传递给被调用的函数,以避免该函数将额外的 stop_token 作为第一个参数

停止来源

std::stop_source 是 C++20 引入的一个类,用于管理停止请求.它与 std::stop_tokenstd::jthread 一起使用

1
2
3
4
5
6
7
8
9
std::stop_source ssrc;
std::stop_token stok{ssrc.get_token()};
std::stop_callback cb{stok, []() { syncOut() << "stop requested\n"; }};
auto fut1 = std::async(std::launch::async, [stok] {
for (int i = 0; i < 10; ++i) {
stop_callback_task(stok, i);
}
});
ssrc.request_stop();

停止来源可以request_stop,jthread对象也可以request_stop. jthread对象可以可以获得停止源和stop_token

若启动多个 jthread,每个线程都有自己的停止令牌,这可能会导致停止所有线程的时间可能比预期的要长

可能还需要使用相同的停止令牌为多个线程请求停止,只需自己创建停止令牌,或者从已经启 动的第一个线程中获取停止令牌,并将此停止令牌作为第一个参数启动线程

std::latch锁存器与std::barrier

std::latch 是一个一次性的同步点,类似于倒计时事件.它允许多个线程等待某个计数器达到零,然后所有等待的线程同时继续执行.一旦计数器达到零,std::latch 就不能再被重置.

主要特点

  • 一次性:一旦计数器达到零,std::latch 就不能再被重置.
  • 倒计时:多个线程可以通过调用 count_down 方法减少计数器的值.
  • 等待:线程可以调用 wait 方法等待计数器达到零.

成员方法

  • latch(count_t count):构造函数,初始化计数器.
  • count_down(count_t n = 1):减少计数器的值.
  • count_down_and_wait(count_t n = 1):减少计数器的值并等待计数器达到零.
  • wait():等待计数器达到零.
  • try_wait():尝试等待计数器达到零,如果计数器尚未达到零则立即返回 false.
  • arrive_and_wait(count_t n = 1):等同于 count_down_and_wait(n)

锁存器是用于并发执行的一种新的同步机制,支持单次使用异步倒计时.从初始整数值开始, 各种线程可以自动将该值计数到零.当计数器达到零时,等待此倒计时的所有线程继续运行

image-20240928144118863

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <iostream>
#include <latch>
#include <stop_token>
#include <thread>
#include <vector>
using namespace std::literals;

void worker(std::latch &l) {
std::this_thread::sleep_for(1s);
l.count_down();
std::cout << "Worker thread finished and counted down\n";
}
int main() {
const int num_threads{5};
std::latch l(num_threads);
std::vector<std::jthread> threads;
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(worker, std::ref(l));
}
l.wait();
for (auto &t : threads) {
t.join();
}

std::latch allReady{10};
std::vector<std::jthread> threads2;
for (int i = 0; i < num_threads; ++i) {
std::jthread t{[i, &allReady] {
std::this_thread::sleep_for(2s);
allReady.arrive_and_wait();
for (int j = 0; j < i + 5; ++j) {
std::cout.put(static_cast<char>('A' + j));
std::this_thread::sleep_for(100ms);
}
}};
threads2.push_back(std::move(t));
}
}

barrier是用于并发执行的新的同步机制,允许多次同步多个异步任务.设置初始计数后,多个线程可以对其进行计数,并等待计数器达到零.与锁存器相比,当达到零时,将调用一个 (可选的) 回调,计数器将重新初始化为初始计数

image-20240928145528067

arrive() 函数会返回一个类型为 std::barrier::arrival_token 的到达令牌,以确保 barrier 知道要等待哪个线程.

信号量

C++20 引入了处理信号量的新类型.信号量是轻量级同步原语,允许同步或限制对一个或一组资源的访问

C++ 标准库提供了两种信号量类型:

  • std::counting_semaphore<> 将多个资源的使用限制在最大值
  • std::binary_semaphore<> 限制对单一资源的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <iostream>
#include <mutex>
#include <queue>
#include <semaphore>
#include <thread>

using namespace std::chrono_literals;
int main() {
std::queue<char> values;
std::mutex valuesMx;

for (int i = 0; i < 1000; ++i) {
values.push(static_cast<char>('a' + (i % ('z' - 'a'))));
}
constexpr int numThreads = 10;
std::counting_semaphore<numThreads> enabled{0};
std::vector<std::jthread> pool;
for (int idx = 0; idx < numThreads; ++idx) {
std::jthread t{[idx, &enabled, &values, &valuesMx](std::stop_token st) {
while (!st.stop_requested()) {
enabled.acquire();
char val;
{
std::lock_guard lg{valuesMx};
val = values.front();
values.pop();
}
for (int i = 0; i < 10; i++) {
std::cout.put(val);
auto dur = 130ms * ((idx % 3) + 1);
std::this_thread::sleep_for(dur);
}
enabled.release();
}
}};
pool.push_back(std::move(t));
}
std::cout << "====================" << std::endl;
std::this_thread::sleep_for(2s);

std::cout << "====================" << std::endl;
enabled.release(3);
std::this_thread::sleep_for(2s);

for(auto& t:pool){
t.request_stop();
}

启动了 10 个线程,但限制了允许其中多少线程主动运行和处理数据,因此将信号量 初始化为最大数量 (10) 和初始资源数量 (0)

因为信号量初始化为零,所以最初的情况是阻塞,因此没有可用的资源.使用release控制最大资源,acquire获得资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int sharedData = 0;
std::binary_semaphore dataReady{0};
std::binary_semaphore dataDone{0};
std::jthread process{[&](std::stop_token st) {
while (!st.stop_requested()) {
if (dataReady.try_acquire_for(1s)) {
int data = sharedData;
std::cout << "[process] read " << data << std::endl;
std::this_thread::sleep_for(data * .5s);

dataDone.release();
} else {
std::cout << "[process] no data to process\n";
}
}
}};
for (int i = 9; i < 10; ++i) {
std::cout << "[main] write " << i << std::endl;
sharedData = i;
dataReady.release();
dataDone.acquire();
std::cout << "[main] data processed\n";
}

原子操作

  • std::atomic_ref:用于在现有对象上提供原子操作,适用于非侵入性地将非原子类型包装成原子类型.
  • std::atomic<std::shared_ptr<T>>:用于在多线程环境中安全地管理共享指针,确保指针的引用计数和其他操作是线程安全的.

流的并发输出会导致未定义行为 (这是数据竞争,指具有未定义行为的竞争条件).

同步并发输出流

1
2
3
4
#include <syncstream>
inline auto syncOut(std::ostream& strm=std::cout){
return std::osyncstream{strm};
}

c++中的协程

C++20 引入了对协程的支持,协程 是可以挂起的函数(类似于python中yield的生成器?)

调用普通函数 (或过程) 时,然后运行到它们的结束 (或直到到达返回语句或抛出异常),而协程 是可以分多个步骤运行的函数

某些时刻,可以挂起一个协程,所以该函数暂停其计算,直到恢复.挂起可能是因为函数必须 等待某些东西,有其他 (更重要的) 事情要做,或者有一个中间结果要给调用者. 因此,启动协程意味着启动另一个函数,直到它的一部分完成.调用函数和协程都在它们的两 条执行路径之间来回切换.

注意,这两个函数不是并行运行的,我们用控制流来打乒乓球:

  • 函数可以通过开始或继续协程的语句来决定启动或恢复其当前控制流.
  • 当协程运行时,协程可以决定挂起或结束其执行,启动或恢复协程的函数将继续执行其控制流.

image-20240928155220941

协程的最简单形式中,主控制流和协程的控制流都在同一个线程中运行.不需要使用多线程, 也不需要处理并发访问,但可以在不同的线程中运行协程.甚至可以在不同的线程上将协程恢复到 先前挂起的位置.协程有一种正交特性,但其可以与多个线程一起使用.甚至可以在不同的线程上 将协程恢复到先前挂起的位置

使用协程就像在后台有一个函数,可以不时地启动和继续.然而,由于协程的生命周期超出了嵌套作用域,因此协程也是一个将其状态存储在某些内存中并提供处理状态的 API.

  • 协程通常返回一个对象,作为调用者的协程接口.根据协程的目的和用途,该对象可以表示 一个不时挂起或切换上下文的正在运行的任务,不时产生值的生成器,或者一个按需惰性地 返回一个或多个值的工厂
  • 协程无堆栈.不挂起外部协程的情况下,无法挂起在外部协程中调用的内部协程,只能将外 部协程作为一个整体挂起. 当协程挂起时,协程的状态作为一个整体被存储在与堆栈分开的对象中,以便它可以在完全 不同的上下文中 (在不同的调用堆栈中,在另一个线程中等) 恢复.

只需在函数中使用以下关键字即可隐式定义协程:

  • co_await
  • co_yield
  • co_return

若这些关键字在协程中都不没有,则必须显式地使用 co_return; 语句.

协程具有以下属性和限制:

  • 协程不允许有返回语句.
  • 协程不能是 constexpr 或 consteval
  • 协程不能有返回类型 auto 或其他占位符类型.
  • main() 不能是协程.
  • 构造函数或析构函数不可为协程
  • 协程可以是静态
  • 协程若不是构造函数或析构函数,可以是成员函数
  • 协程甚至可以是 Lambda,但在这种情况下,必须谨慎使用

实现协程接口与句柄

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <coroutine>

class CoroTask {
public:
struct promise_type;
using CoroHdl = std::coroutine_handle<promise_type>;

private:
CoroHdl hdl;

public:
CoroTask(auto h) : hdl{h} {}
~CoroTask() {
if (hdl)
hdl.destroy();
}
CoroTask(const CoroTask &) = delete;
CoroTask &operator=(const CoroTask &) = delete;

bool resume() const {
if (!hdl || hdl.done())
return false;
hdl.resume();
return !hdl.done();
}
};

struct CoroTask::promise_type {
auto get_return_object() { return CoroTask{CoroHdl::from_promise(*this)}; }
auto initial_suspend() { return std::suspend_always{}; }
void return_void() {}
void unhandled_exception() {}
auto final_suspend() { return std::suspend_always{}; }
};

处理 C++ 中的协程,需要做两件事:

  • promise 类型 此类型用于定义处理协同例程的某些自定义点,特定的成员函数定义了在特定情况下调用的 回调函数.
  • std::coroutine_handle<> 类型的内部协程句柄 此对象在调用协程时创建 (使用上述 promise 类型的标准回调之一),可以通过提供一个底层接 口来恢复协程以及处理协程的结束,从而用于管理协程的状态

引入 promise_type(每个协程类型都必须拥有),并声明本地协程句柄 hdl,它管理协程的状态. 原生协程句柄 std::coroutine_handle<> 的类型是用 promise 类型参数化的,存储在 promise 中的任何 数据都是句柄的一部分,promise 中的函数可以通过句柄访问

resume方法在协程挂起时恢复协程,其或多或少地将恢复请求传播到原生协程句 柄,其返回表示是否有必要再次恢复协程.

函数检查是否有句柄,或者协程是否已经结束. 尽管在这个实现中协程接口总是有一个句柄,但这是一个必要的检查,例如,若接口支持移动 语义. 只有当协程挂起且尚未结束时才允许调用 resume(),所以检查是否 done() 是必要的.调用本身恢复挂起的协程并阻塞,直到下一个挂起点或结束

针对co_await等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <CoroTask.hpp>
#include <iostream>

Coro coro(int max) {
std::cout << "CORO" << max << " start\n";
for (int val = 1; val <= max; ++val) {
std::cout << "CORO" << max << " " << val << "\n";
co_await std::suspend_always{};
}
}
int main() {
auto coroTask = coro(3);
std::cout << "coro() started\n";
while (coroTask.resume()) {
std::cout << "coro() suspended";
}
std::cout << "coro() done\n";
}

定义promise 类型目的是:

  • 定义如何创建或获取协程的返回值 (通常包括创建协程句柄)
  • 决定协同程序是应该在开始还是结束时挂起
  • 处理协程调用者与协程之间交换的值
  • 处理未处理的异常
1
2
3
4
5
6
7
8
9
10
11
12
13
struct CoroTask::promise_type {
// init and return coroutine interface
// 创建协程接口 创建对象
auto get_return_object() { return CoroTask{CoroHdl::from_promise(*this)}; }
// 主动启动还是需要resume
auto initial_suspend() { return std::suspend_always{}; }
// 到达结束时的操作
void return_void() {}
// 异常处理
void unhandled_exception() {}
// 最终是否挂起
auto final_suspend() { return std::suspend_always{}; }
};

针对co_yield返回值

在promise_type中定义值,并定义yield_value用于保存值,在协程返回对象中定义getValue方便访问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

class CoroTask {
public:
struct promise_type {
int coroValue = 0;
...
auto yield_value(int val) {
coroValue = val;
return std::suspend_always{};
}
...
};
using CoroHdl = std::coroutine_handle<promise_type>;

private:
CoroHdl hdl;

public:
CoroTask(auto h) : hdl{h} {}
~CoroTask() {
if (hdl)
hdl.destroy();
}
CoroTask(const CoroTask &) = delete;
CoroTask &operator=(const CoroTask &) = delete;

bool resume() const {
if (!hdl || hdl.done())
return false;
hdl.resume();
return !hdl.done();
}
int getValue() const { return hdl.promise().coroValue; }
};

针对co_return结束并返回值

通过使用 co_return,协程可以在其结束时向调用者返回结果. 实现方式是在promise_type中增加return_value,然后依旧在协程接口对象中声明一个方法访问coroutine_handle.

若协程以有时可能返回值,有时可能不返回值的方式实现,则这是未定义行为.则这个协程无效. 也就是return_value和return_void也不能同时声明

实现Awaitable对象

协程必须)提供Awaitables(实现方式是使用 Awaiter)

Awaitables 是 co_await 需要作为其操作数的术语,所以 awaitables 是 co_await 可以处理的所有对象.

Awaiter 是实现 Awaitables 的一种特定 (和典型) 方式的术语.

其必须提供三个特定的成员函数来处理协程的暂停和恢复

image-20240928181236087

await_ready() 在协程被挂起之前调用,提供挂起.若它返回 true,则协程根本不会挂起. 这个函数通常只返回 false

auto await_suspend(awaitHdl)

协程挂起后立即为协程调用此函数.参数 awaitHdl 是被挂起的协程的句柄,其类型是 std::coroutine_handle. 这个函数中,可以指定下一步要做什么,包括立即恢复挂起的或等待的协程.

auto await_resume()成功挂起后恢复协程时,将为协程调用此函数.可以返回一个值,这个值就是 co_await 表达式产生的值

1
2
3
4
5
6
7
8
9
10
11
12
#include <iostream>
class Awaiter {
public:
bool await_ready() const noexcept {
std::cout << " await_ready\n";
return false;
}
void await_suspend(auto hdl) const noexcept {
std::cout << " await_suspend\n";
}
void await_resume() const noexcept { std::cout << " await_resume\n"; }
};

使用Awaiter

1
2
3
4
5
6
7
8
9
10
CoroTask coro(int max) {
std::cout << "CORO" << max << " start\n";
for (int val = 1; val <= max; ++val) {
std::cout << "CORO" << max << " " << val << "\n";
co_await std::suspend_always{};
co_await Awaiter{};
// co_yield val;
// co_return 1;
}
}

有两个标准Awaiters,`std::suspend_alwaysstd::suspend_never. 差别就是await_ready的返回值

若在 await_ready() 中返回 false(而在 await_suspend() 中没有返回任何值),则 suspend_always 接受每个挂起,并将协程返回给其调用者.

若在 await_ready() 中返回 true,则 suspend_never 永远不会接受任何挂起,则协程继续 (永远不会调用 await_suspend()).

std::suspend_always通常用作 co_await 的基本 awaiter.

挂起后将值传递回协程

协程接口的 promise 类型是协程与调用者共享和交换数据的最佳位置,在promse_type的yield_value中返回一个awaiter,这个awaiter能够修改协程接口中的promise并在挂起恢复时设置值.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void setBackValue(const auto &val) { hdl.promise().backValue = val; } // 在协程接口中写一个方法能够修改返回值
auto yield_value(int val) {
coroValue = val;
backValue.clear();
// return std::suspend_always{};
return BackAwaiter<CoroHdl>{};
}
template <typename Hdl> class BackAwaiter {
Hdl hdl = nullptr;

public:
BackAwaiter() = default;
bool await_ready() const noexcept {
std::cout << "BackAwaiter await_ready\n";
return false;
}
void await_suspend(Hdl h) noexcept { hdl = h; }
auto await_resume() { return hdl.promise().backValue; }
};

参考资料

  1. 第 7 章 并行与并发 现代 C++ 教程: 高速上手 C++ 11/14/17/20 - Modern C++ Tutorial: C++ 11/14/17/20 On the Fly (changkun.de)
  2. C++ 并发三剑客future, promise和async · 恋恋风辰的编程笔记 (llfc.club)
  3. C++ Concurrency In Action
-------------本文结束感谢您的阅读-------------
感谢阅读.

欢迎关注我的其它发布渠道