. 1 2 3 4 5 6 int num2{3 }; auto n2 = std::ref (num2); auto ttc = n2; std::cout << ttc << std::endl; ttc += 12 ; std::cout << n2 << std::endl;
join与detach detach() 是线程分离,线程对象放弃了线程资源的所有权,此时thread根本没有关联任何线程.调用 join() 是:“阻塞当前线程直至 *this 所标识的线程结束其执行”,线程对象都没有线程,就不需要阻塞了.
不能拷贝构造/赋值与转移所有权 传入可调用对象以及参数,构造 std::thread
对象,启动线程,而线程对象拥有了线程的所有权,线程是一种系统资源,所以可称作“线程资源 ”.
std::thread 不可复制.两个 std::thread 对象不可表示一个线程,std::thread 对线程资源是独占所有权.而移动 操作可以将一个 std::thread
对象的线程资源所有权转移给另一个 std::thread
对象.
1 2 3 4 5 6 7 8 9 10 int main () { std ::thread t{ [] { std ::cout << std ::this_thread::get_id() << '\n' ; } }; std ::cout << t.joinable() << '\n' ; std ::thread t2{ std ::move(t) }; std ::cout << t.joinable() << '\n' ; t2.join(); }
1 2 3 4 5 6 7 8 9 std::thread f () { std::thread t{ [] {} }; return t; } int main () { std::thread rt = f (); rt.join (); }
return t
重载决议 [1] 选择到了移动构造 ,将 t
线程资源的所有权转移给函数调用 f()
返回的临时 std::thread
对象中,然后这个临时对象再用来初始化 rt
,临时对象是右值表达式,这里一样选择到移动构造 ,将临时对象的线程资源所有权移交给 rt
.此时 rt
具有线程资源的所有权,由它调用 join()
正常析构
1 2 3 4 5 6 7 8 9 void f(std::thread t){ t.join(); } int main(){ std::thread t{ [] {} }; f(std::move(t)); f(std::thread{ [] {} }); }
std::move
将 t 转换为了一个右值表达式,初始化函数f
形参 t
,选择到了移动构造转移线程资源的所有权,在函数中调用 t.join()
后正常析构.std::thread{ [] {} }
构造了一个临时对象,本身就是右值表达式,初始化函数f
形参 t
,移动构造转移线程资源的所有权到 t
,t.join()
后正常析构.
std::this_thread
get_id
sleep_for
yield
sleep_until
数据竞争 当某个表达式的求值写入某个内存位置,而另一求值读或修改同一内存位置时 ,称这些表达式冲突. 拥有两个冲突的求值的程序就有数据竞争,除非
两个求值都在同一线程上,或者在同一信号处理函数中执行,或 两个冲突的求值都是原子操作(见 std::atomic),或 一个冲突的求值发生早于 另一个(见 std::memory_order) 如果出现数据竞争,那么程序的行为未定义.
互斥量 互斥量用于保护多线程下的共享数据的读写
互斥量(Mutex),又常被称为互斥锁、互斥体(或者直接被称作“锁”),是一种用来保护临界区的特殊对象,其相当于实现了一个公共的“标志位 ”.它可以处于锁定(locked)状态,也可以处于解锁(unlocked)状态:
如果互斥量是锁定的,通常说某个特定的线程正持有这个锁. 如果没有线程持有这个互斥量,那么这个互斥量就处于解锁状态. 1 2 3 4 std::mutex m{}; m.lock (); m.unlock ()
如果多个线程中,其中一个线程在执行互斥区操作,其他线程执行到了m.lock()时会阻塞直到m.unlock释放.
1 2 3 4 std::mutex m{}; m.try_lock (); m.unlock ()
try_lock不会阻塞,而是会返回一个bool值,如果失败了就返回false,上锁成功返回true
如何管理互斥量 使用std::lock_guard
与std::scoped_lock
1 2 3 4 5 6 7 8 9 10 11 12 13 class MyLockGuard { public : using mutext_type = T; explicit MyLockGuard (T t_mutex) : _MyMutex(t_mutex) { _MyMutex.lock (); } MyLockGuard (T& _Mtx, std::adopt_lock_t ) noexcept : _MyMutex(_Mtx) {} MyLockGuard (const MyLockGuard&) = delete ; MyLockGuard& operator =(const MyLockGuard&) = delete ; ~MyLockGuard () noexcept { _MyMutex.unlock (); } private : T& _MyMutex; };
提供便利 RAII 风格机制的互斥包装器,它在作用域块的存在期间占有一或多个互斥体.不可复制、移动.当创建对象时,它尝试取得给定互斥体的所有权.当控制离开创建对象的作用域时,析构并释放互斥体
1 2 3 4 5 6 7 8 9 10 11 12 13 void foor_guard () { std::lock_guard<std::mutex> lock{m}; std::cout << "foor" << std::endl; } void add_to_list (int n, std::list<int >& list) { std::vector<int > numbers (n + 1 ) ; std::iota (numbers.begin (), numbers.end (), 0 ); int sum = std::accumulate (numbers.begin (), numbers.end (), 0 ); { std::lock_guard<std::mutex> lc{m}; list.push_back (sum); } }
scoped_lock
类类似,它在作用域块的存在期间占有一或多个互斥体.
互斥量保护数据的问题 当使用lock_guard
时,如果将指针或者引用传递给外部值,这样就脱离mutex管理了.
简而言之:切勿将受保护数据的指针或引用传递到互斥量作用域之外 ,不然保护将形同虚设共享数据 | 现代C++并发编程教程 (mq-b.github.io)
互斥可能导致的死锁 当有多个互斥量时可能遇到死锁.避免死锁的一般建议是让两个互斥量以相同的顺序上锁,总在互斥量 B 之前锁住互斥量 A,就通常不会死锁
1 2 3 4 5 6 7 8 9 10 11 12 13 std::mutex m1,m2; std::size_t n{}; void f () { std::lock_guard<std::mutex> lc1{ m1 }; std::lock_guard<std::mutex> lc2{ m2 }; ++n; } void f2 () { std::lock_guard<std::mutex> lc1{ m2 }; std::lock_guard<std::mutex> lc2{ m1 }; ++n; }
上面代码就有可能死锁. 修改上锁顺序即可.
但是即使上锁顺序相同,也有可能导致死锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 struct X { X (const std::string& str) :object{ str } {} friend void swap (X& lhs, X& rhs) ; private : std::string object; std::mutex m; }; void swap (X& lhs, X& rhs) { if (&lhs == &rhs) return ; std::lock_guard<std::mutex> lock1{ lhs.m }; std::lock_guard<std::mutex> lock2{ rhs.m }; swap (lhs.object, rhs.object); }
1 2 3 X a {"a "},b {"b "}; std::thread t{[&]{swap (a,b);}}; std::thread t2{[&]{swap (b,a);}};
解决方法是是使用std::lock
,可以同时对多个互斥量上锁,如果已经上锁会抛出异常并unlock解锁这些互斥量,或者使用刚才的std::scoped_lock,提供与lock_guard同样的RAII包装.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void swap (Xa& lhs, Xa& rhs) { if (&lhs == &rhs) return ; std::lock (lhs.m, rhs.m); std::lock_guard<std::mutex> lock1{lhs.m,std::adopt_lock}; std::lock_guard<std::mutex> lock2{rhs.m,std::adopt_lock}; swap (lhs.object, rhs.object); } void swap (Xa& lhs, Xa& rhs) { if (&lhs == &rhs) return ; std::scoped_lock (lhs.m, rhs.m); swap (lhs.object, rhs.object); }
使用 std::scoped_lock
可以将所有 std::lock
替换掉,减少错误发生
Tips for avoiding dead lock 避免嵌套锁 线程获取一个锁时,就别再获取第二个锁.每个线程只持有一个锁,自然不会产生死锁.如果必须要获取多个锁,使用 std::lock
或std::scoped_lock
避免在持有锁时调用外部代码 因为代码是外部提供的,所以没办法确定外部要做什么.外部程序可能做任何事情,包括获取锁.在持有锁的情况下,如果用外部代码要获取一个锁,就会违反第一个指导意见,并造成死锁(有时这是无法避免的)
使用固定顺序获取锁 避免死锁
std::unique_lock unique_lock
更加灵活,它不能拷贝,内部有一个_Owns
变量表明是否有锁的拥有权(或者说是否已经上锁),默认构造函数调用时owns为false并上锁
1 2 3 4 5 6 _NODISCARD_CTOR_LOCK explicit unique_lock (_Mutex& _Mtx) : _Pmtx(_STD addressof(_Mtx)), _Owns(false) { _Pmtx->lock (); _Owns = true ; }
此外还有std::defer_lock
和std::adopt_lock
分别表示没有上锁(_Owns为false),构造函数中不会上锁和已经上锁(_Owns为true),,构造函数中不会 上锁
1 2 3 4 5 unique_lock (_Mutex& _Mtx, defer_lock_t ) noexcept : _Pmtx(_STD addressof (_Mtx)), _Owns(false ) {} lock_guard (_Mutex& _Mtx, adopt_lock_t ) noexcept : _MyMutex(_Mtx) {}
unique_lock
类中也有lock和unlock方法,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void lock () { _Validate(); _Pmtx->lock (); _Owns = true ; } void unlock () { if (!_Pmtx || !_Owns) { _Throw_system_error(errc::operation_not_permitted); } _Pmtx->unlock (); _Owns = false ; }
简而言之:
使用 std::defer_lock
构造函数不上锁,要求构造之后上锁 使用 std::adopt_lock
构造函数不上锁,要求在构造之前互斥量上锁 默认构造会上锁,要求构造函数之前和构造函数之后都不能再次上锁 通常建议优先 std::lock_guard
,当无法满足你的需求或者显得代码非常繁琐,那么可以考虑使用 std::unique_lock
std::mutex是不能拷贝也不能移动的量 ,在unique_lock中保存了指向它的指针,而unique_lock是可以移动的,所以可以利用unique_lock转移互斥量(准确地说需要互斥量在这些作用域内存活并通过移动构造、赋值进行转移)
保护共享数据初始化 一些数据在多线程环境下进行初始化时可能会导致多次初始化,也是数据竞争的行为. 可以采用其他方式进行保护.
1 2 3 4 5 6 7 8 9 10 11 struct Data {}; std::once_flag flag; std::shared_ptr<Data> data; void init_resouce () { data.reset (new Data); } void foo () { std::call_once (flag,init_resouce); }
std::call_once
可以接收可调用对象,传入flag表明之及逆行一次初始化,使得线程安全. 静态局部变量初始化在 C++11 是线程安全
读写锁保护不常更新的数据结构 有时多线程环境下,一个线程基本只用来写,其他线程用来读. 可以使用std::shared_mutex
保证写线程独占权和读线程的访问权.
shared_mutex
类是一个同步原语,可用于保护共享数据不被多个线程同时访问.与便于独占访问的其他互斥体类型不同,shared_mutex 拥有两个访问级别:
共享 - 多个线程能共享同一互斥体的所有权.
独占 - 仅一个线程能占有互斥.
若一个线程已获取独占 锁(通过 lock、try_lock,则无其他线程能获取该锁(包括共享 的).
若一个线程已获取共享 锁(通过 lock_shared、try_lock_shared),则无其他线程能获取独占 锁,但可以获取共享 锁.
仅当任何线程均未获取独占 锁时,共享 锁能被多个线程获取.
在一个线程内,同一时刻只能获取一个锁(共享 或独占 )
std::shared_lock
主要是提供了lock_shared的作用,区分了共享锁和独占锁,如果有独占锁,多个线程不管使用共享锁还是独占锁都需要阻塞,反之如果全是共享锁则不会阻塞(因为全是读操作)
1 2 3 4 5 6 7 8 9 10 11 std::map<std::string, std::string> data_; std::shared_mutex mtx; void writeData () { std::lock_guard lg{mtx}; data_["fa" ] = "af" ; } void readData () { std::shared_lock sl{mtx}; auto it = data_.find ("aa" ); std::cout << (it == data_.end () ? it->second : "" ); }
std::recursive_mutex 在同一线程多次lock一个普通mutex,是未定义行为. 如果在一个线程多次lock,另一个线程就一直无法拿到锁了. 使用std::recursive_mutex使得同一线程在lock和unlock次数一样的情况下才会真正释放锁.
它允许同一线程多次锁定同一个互斥量,而不会造成死锁.当同一线程多次对同一个 std::recursive_mutex
进行锁定时,只有在解锁与锁定次数相匹配时,互斥量才会真正释放 .但它并不影响不同线程对同一个互斥量进行锁定的情况.不同线程对同一个互斥量进行锁定时,会按照互斥量的规则进行阻塞
在使用迭代函数中使用锁时可以使用这个互斥量
通常不直接调用 unlock()
,而是使用std::unique_lock与std::lock_guard管理排他性锁定.
调用lock
时所有权层数+1,调用unlock
时,如果所有权层数为1,解锁互斥量,否则-1.
new、delete的线程安全性 如果标准达到 C++11 ,要求下列函数 是线程安全的:
new
和 delete
运算符的库 版本全局 new
运算符和 delete
运算符的用户替换版本 std::calloc、std::malloc、std::realloc、std::aligned_alloc](C++17 起)、std::free 内存分配、释放操作是线程安全 ,构造和析构不涉及共享资源.而局部对象 p
对于每个线程来说是独立的 .换句话说,每个线程都有其自己的 p
对象实例,因此它们不会共享同一个对象,自然没有数据竞争
1 2 3 4 5 T* p = nullptr ; void f () { p = new T{}; delete p; }
如果 p
是全局对象(或者外部的,只要可被多个线程读写),多个线程同时对其进行访问和修改时,就可能会导致数据竞争和未定义行为.因此,确保全局对象的线程安全访问通常需要额外的同步措施,比如互斥量或原子操作 .
1 2 3 4 5 6 7 8 9 10 11 12 int n = 1 ;struct X { X (int v){ ::n += v; } }; void f () { X* p = new X{ 1 }; delete p; }
C++ 只保证了 operator new
、operator delete
这两个方面的线程安全
new
表达式线程安全要考虑三方面:operator new
、构造函数、修改指针.
delete
表达式线程安全考虑两方面:operator delete
、析构函数
线程存储期 线程存储期的对象在线程开始时分配,并在线程结束时释放. 使用thread_local
声明变量,声明线程存储期的对象,每一个线程都有独立的 thread_local
对象
1 2 3 4 5 6 7 8 9 10 int global_counter = 0 ; thread_local int thread_local_counter=0 ; void print_counters () { std::cout<<"global: " <<global_counters++<<'\n' ; std::cout<<"thread_local" <<thread_local_counter<<'\n' ; } int main () { std::thread{print_counters}.join (); std::thread{print_counters}.join (); }
同步操作 条件变量 条件变量有std::condition_variable
和std::condition_variable_any
,
std::condition_variable是与std::mutex 一起使用的同步原语,它能用于阻塞一个线程,或同时阻塞多个线程,直至另一线程修改共享变量(条件 )并通知std::condition_variable
条件变量用于同步,可以阻塞线程并使用notify_one
让解除相关线程阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void wait_for_flag () { std::unique_lock ul{cv_mutex}; cv.wait (ul, [] { return flag; }); std::cout << "arrived\n" ; } void set_flag_true () { { std::lock_guard lg{cv_mutex}; std::cout << "set_flag_true\n" ; std::this_thread::sleep_for (1 s); flag = true ; } cv.notify_all (); } int main () { std::jthread t1{wait_for_flag}, t2{wait_for_flag}, t3{set_flag_true}; return 0 ; }
此外还有std::condition_variable_any
,相对于只在 std::unique_lock上工作的 std::condition_variable,condition_variable_any
能在任何满足可基本锁定要求的锁上工作(只需要lock和unlock方法)
future获得线程结果 如果要获得一个线程处理后的结果,可以通过使用condition_variable同步,cv.notify(),cv.wait(). 但是更好的方式是通过future获取返回值
类模板 std::future
提供访问异步操作结果的机制:
(通过std::async,std::packaged_task或 std::promise创建的)异步操作能提供一个 std::future
对象给该异步操作的创建者.
然后,异步操作的创建者可以使用多个方法查询、等待或从 std::future
提取值.若异步操作尚未提供值,则这些方法可能阻塞.
当异步操作准备好发送结果给创建者时,它可以修改与创建者的 std::future
相链接的共享状态
std::thread
没提供直接从线程获取返回值的机制.所以可以使用 std::async 函数模板,使用async与thread类似,默认按值赋值,内部将参数副本转换为右值. future的get和wait方法也是用于同步的,
1 2 3 4 5 6 void f (const int & p) {}void f2 (int & p ) {}int n = 0 ;std::async (f, n); std::async (f2, n);
async接受所有可调用对象(函数,类成员方法,仿函数类,lambda),与thread类似,其有不同的异步执行策略,std::launch::defered与std::launch::async
std::launch::async
在不同线程上 执行异步任务.std::launch::deferred
惰性求值,不创建线程 ,等待 future
对象调用 wait
或 get
成员函数的时候执行任务. 如果从 std::async
获得的 std::future
没有被移动或绑定到引用,那么在完整表达式结尾, std::future
的析构函数将阻塞,直到到异步任务完成 .因为临时对象的生存期就在这一行,而对象生存期结束就会调用调用析构函数.
被移动的 std::future
没有所有权,失去共享状态,不能调用 get
、wait
成员函数. 此外还有valid
检查 future 当前是否关联共享状态,即是否当前关联任务.还未关联,或者任务已经执行完(调用了 get()、set()),都会返回 false
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 struct X { int operator () (int n) const { return n * n; } }; struct Y { int f (int n) const { return n * n; } }; void f (int & p) { std::cout << &p << '\n' ; }int main () { Y y; int n = 0 ; auto t1 = std::async (X{}, 10 ); auto t2 = std::async (&Y::f,&y,10 ); auto t3 = std::async ([] {}); auto t4 = std::async (f, std::ref (n)); std::cout << &n << '\n' ; }
得到的future使用wait
同步等待处理,或者使用get
获得结果.
packaged_task 类模板 std::packaged_task
包装任何可调用目标(函数、lambda 表达式、bind 表达式或其他函数对象),使得能异步调用它.其返回值或所抛异常被存储于能通过 std::future对象访问的共享状态中.
其重载了()
操作符,所以本身也是一个可调用目标,可以传给一个线程. 如果不使用多线程
1 2 3 4 5 6 7 8 void f (int ) {};int main () { auto task_2{std::packaged_task<int (int )>{f}}; auto fut_2{task_2.get_future ()}; task_2 (10 ); std::cout << fut_2.get () << '\n' ; }
上面任务并不会在线程中执行,所以并没有并发、异步. 所以需要结合多线程,但注意packaged_task不能拷贝,只能移动.
1 2 3 4 5 6 7 auto task_2{std::packaged_task<int (int )>{foo}};auto fut_2{task_2.get_future ()};std::thread fut_t {std::move (task_2), 10 }; fut_t .join ();std::cout << fut_2.get () << '\n' ;
std::packaged_task
也可以在线程中传递,在需要的时候获取返回值,而非将它自己作为可调用对象. 也就是说thread启动一个可调用对象,这个可调用对象中会调用这个packaged_task,可以通过future获得值.
使用std::promise设置值 如果要设置一个值,可以传递reference_wrapper\然后join线程即可获取值. 1 2 3 4 void set_val (int &n) { n = 20 ; }int tn{0 };std::jthread jd{set_val, std::ref (tn)}; jd.join ();
但是考虑到这样也许并不好,不仅必须传入通过传入引用、指针,控制颗粒度也不够.
类模板 std::promise
提供用以存储一个值或一个异常,之后通过 std::promise
对象所创建的std::future
对象异步获得.注意 std::promise
只应当使用一次.
每个promise都与一个共享状态 关联,其中含有一些状态信息和一个结果 ,它可能尚未求值、已求值为一个值(可能为 void
),或者求值为一个异常.promise可以对共享状态做三件事:
使就绪 :promise存储结果或异常于共享状态.标记共享状态为就绪,并除阻在该共享状态所关联的未来体上等待的任何线程.释放 :promise放弃其对共享状态的引用.若这是最后一个这种引用,则销毁共享状态.除非这是 std::async所创建的未就绪的共享状态,否则此操作不阻塞.抛弃 :promise存储以 std::future_errc::broken_promise 为错误码的 std::future_error 类型的异常,令共享状态为就绪 ,然后释放 它 1 2 3 4 5 6 7 void f (std::promise<int > obj,int num) { obj.set_value (num*num); } std::promise<int > p; auto fut = p.get_future ();std::jthread t{f,std::move (p),3 }; int result = p.get ();
std::promise
只能移动,不可复制,所以需要使用std::move
.
在主线程中通过与其关联的 future 对象的 get()
成员函数获取这个值,如果promise
的值还没有被设置,那么将阻塞当前线程
除了返回一般值外还可以设置异常,但一个promise只能要么设置异常要么设置值,如果设置异常,则通过在promise所在函数中使用try与get进行捕获.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void calculate_square (std::promise<int > promiseObj, int num) { std::this_thread::sleep_for (100 ms); if (!num) { promiseObj.set_value (num * num); } else { promiseObj.set_exception (std::current_exception ()); } } std::thread t1t (calculate_square, std::move(promise), num) ; try { std::cout << "等待线程执行...\n" ; int result = fut_3.get (); std::cout << "Result is " << result << std::endl; } catch (std::exception &e) { std::cerr << "来自线程的异常" << e.what () << '\n' ; } t1t.join ();
future的状态变化 future保有共享状态,只能移动,调用 get
函数后,future对象会失去共享状态,std::future
所引用的共享状态不与另一异步返回对象共享
移动语义 :因为移动操作标志着所有权的转移 ,意味着 future
不再拥有共享状态.get
和 wait
函数要求 future
对象拥有共享状态,否则会抛出异常.共享状态失效 :调用 get
成员函数时,future
对象必须拥有共享状态,但调用完成后,它就会失去共享状态 ,不能再次调用 get
. future 是一次性的 ,它的结果只能被一个线程获取.get()
成员函数只能调用一次,当结果被某个线程获取后,std::future
就无法再用于其他线程.
使用shared_future共享状态 目前shared_xx学习到的有,shared_ptr,shared_mutex,shared_lock,现在又有了shared_future. unique_ptr与unique_lock,future都表示独占所有权(只能移动),而shared_xx本身可以复制,并且可以共享.
主要用于在不同线程中共享一个任务/线程中的数据,它也通过wait和get获取数据.
通过future.share或直接通过future移动构造shared_future
具体使用通过传入shared_future的拷贝
1 2 3 4 5 6 7 8 9 10 11 12 13 14 int task (int num) { return num*num;}int main () { auto fut = std::async (task,10 ); std::shared_future<int > fut_shared = fut.share (); std::jthread thread1{[fut_shared]{ int result = fut_shared.get (); return result*2 ; }} std::jthread thread2{[fut_shared]{ int result = fut_shared.get (); return result*2 ; }} }
按复制捕获 std::shared_future
对象,每个线程都有一个 shared_future 的副本,这样不会出现数据竞争问题.
限时等待 使用wait_for
和wait_until
进行限时等待,可以通过future或者条件变量等, 限时等待用于在while循环中等待,可以判断结果,与std::future_status
,std::cv_status
Concurrency in C++20 信号量 C++ 提供了两个信号量类型:std::counting_semaphore
与std::binary_semaphore
信号量是更轻量的同步原语.
mutex,条件变量都是同步原语. 但mutex常用于互斥解决数据竞争,而条件
提供release
和acquire
两种方法,分别增加内部计数器并解除获得者以及减少内部计数器或阻塞到直至能如此
信号量常用于发信/提醒 而非互斥,通过初始化该信号量为 0 从而阻塞尝试 acquire() 的接收者,直至提醒者通过调用 release(n) “发信”.在此方面可把信号量当作条件变量的替代品 ,通常它有更好的性能
counting_semaphore能设置最大信号量值,binary_semaphore最大值就是1,相当于控制了同时访问者.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 std::counting_semaphore<3 > semaphore{3 }; void handle_request (int request_id) { std::cout << "进入handle_request尝试获取信号量\n" ; semaphore.acquire (); std::cout << "成功获取信号量\n" ; std::this_thread::sleep_for (1 s); std::random_device rd; std::mt19937 gen{rd ()}; std::uniform_int_distribution dis{1 , 10 }; int process_time = dis (gen); std::this_thread::sleep_for (std::chrono::seconds{process_time}); std::cout << std::format("请求 {} 已被处理\n" , request_id); semaphore.release (); }
std::latch 信号量方便同步,与条件变量类似. 而latch与barrier是线程协调机制,阻塞已知大小的线程组直至该组中的所有线程到达该屏障.
允许任何数量的线程阻塞直至期待数量的线程到达 . latch是单次使用的线程屏障,latch不能重复使用,它会等到需要的线程的数量.
latch
类维护着一个 std::ptrdiff_t 类型的计数,且只能减少计数,无法增加计数.在创建对象的时候初始化计数器的值.线程可以阻塞,直到 latch 对象的计数减少到零.由于无法增加计数,这使得 latch
成为一种**单次使用的屏障
1 2 3 4 5 6 7 8 9 std::latch work_start{3 }; void work () { std::cout << "等待其他线程执行\n" ; work_start.wait (); std::cout << "任务开始执行\n" ; } std::jthread thread{work}; std::this_thread::sleep_for (1 s); work_start.count_down ();
count_down默认将值-1,直到为0时,wait解除阻塞. 此外也有arrive_and_wait
相当于count_down(n);wait();
,这样在可调用对象内部直接使用
std::barrier 可复用的线程屏障,可以在阶段完成之后将计数重置为构造时传递的值.
不同于 std::latch,屏障是可重用的:一旦到达的线程组被解除阻塞,即可重用同一屏障.与 std::latch 不同,会在线程解除阻塞前执行一个可能为空的可调用对象.
barrier也有wait与arrive,它能够多次使用,也就是说如果创建10个线程,每个线程的可调用对象使用barrier值设置为3,那么就会有阻塞,因为9个线程结束,最后一个线程减少barrier设置的值,但依旧不为0. 如果是latch,当内部计数值为0时还调用count_down是未定义行为
arrive_and_wait()
会在期待计数减少至 0
时调用我们构造 barrier 对象时传入的 lambda 表达式,并解除所有在阶段同步点上阻塞的线程.之后重置期待计数为构造中指定的值.屏障的一个阶段就完成了. 还有arrive_and_drop
会将当前与最大的计数均-1.
std::barrier
要求其函数对象类型必须是不抛出异常的.
内存模型与原子操作 内存模型定义了多线程程序中,读写操作如何在不同线程之间可见,以及这些操作在何种顺序下执行.内存模型确保程序的行为在并发环境下是可预测的. 原子操作即不可分割的操作.系统的所有线程,不可能观察到原子操作完成了一半 std::atomic c++标准定义了原子类型,这些类型的操作都是原子的,语言定义中只有这些类型的操作是原子的,虽然也可以用互斥量来模拟原子操作.
每个 std::atomic
模板的实例化和全特化均定义一个原子类型.如果一个线程写入原子对象,同时另一线程从它读取,那么行为有良好定义(使用load和store),std::atomic
既不可复制也不可移动.
标准原子类型的实现通常包括一个 is_lock_free()
成员函数,允许用户查询特定原子类型的操作是否是通过直接的原子指令实现(返回 true),还是通过锁来实现(返回 false)
也可以通过is_always_lock_free和一些宏来检查
1 2 3 4 std::atomic<int > aint = 10 ; aint.is_lock_free (); aint.is_always_lock_free; std::cout<<ATOMIC_INT_LOCK_FREE;
always_lock_free意味着一定无锁,ATOMIC_INT_LOCK_FREE的值若为0则一定有锁,为1则有时无锁,为2则一定无锁.
在实际应用中,如果一个类型的原子操作总是无锁的,可以更放心地在性能关键的代码路径中使用它.
如果发现某些原子类型在目标平台上是有锁的,我们可以考虑以下优化策略:
使用不同的数据结构 :有时可以通过改变数据结构来避免对原子操作的依赖.减少原子操作的频率 :通过批处理等技术,减少对原子操作的调用次数.使用更高效的同步机制 :在一些情况下,其它同步机制(如读写锁)可能比原子操作更高效. 其实很多时候根本没这种性能的担忧,很多时候使用原子对象只是为了简单方便,比如 std::atomic<bool>
表示状态、std::atomic<int>
进行计数等.即使它们是用了锁,那也是封装好了的,起码用着方便,而不需要在代码中引入额外的互斥量来保护,更加简洁.这也是很正常的需求,各位不但要考虑程序的性能,同时也要考虑代码的简洁性、易用性.即使使用原子类型无法带来效率的提升,那也没有负提升.
常用的atomic特化有int,bool,flag,等
1 2 3 4 5 6 7 8 9 10 11 using atomic_char = atomic<char >;using atomic_schar = atomic<signed char >;using atomic_uchar = atomic<unsigned char >;using atomic_short = atomic<short >;using atomic_ushort = atomic<unsigned short >;using atomic_int = atomic<int >;using atomic_uint = atomic<unsigned int >;using atomic_long = atomic<long >;using atomic_ulong = atomic<unsigned long >;using atomic_llong = atomic<long long >;using atomic_ullong = atomic<unsigned long long >;
原子类型常用方法包括load
,store
,exchange
等,不同特化也有不同方法. 可以为自定义类型创建atomic,需要满足可复制构造,可复制赋值以及可平凡复制
1 2 3 4 5 static_assert (std::is_trivially_copyable<trivial_type>::value, "" );static_assert (std::is_copy_constructible<trivial_type>::value, "" );static_assert (std::is_move_constructible<trivial_type>::value, "" );static_assert (std::is_copy_assignable<trivial_type>::value, "" );static_assert (std::is_move_assignable<trivial_type>::value, "" );
原子类型的操作函数有一个内存序参数,对原子对象的访问可以建立线程间同步,并按std::memory_order
对非原子内存访问定序. 任何 std::atomic
类型,初始化不是原子操作,其他方法是原子操作.与大多数赋值运算符不同,原子类型的赋值运算不返回到它的左侧参数的引用.它们会返回存储值的副本
std::atomic_flag std::atomic_flag
是一种原子布尔类型.与所有std::atomic的特化不同,它保证是无锁的.与std::atomic\ 不同,std::atomic_flag
不提供加载或存储操作. 1 2 std::atomic_flag flag{}; bool r = flag.test_and_set ();
当标志对象已初始化,它只能做三件事情:销毁、清除、设置 .这些操作对应的函数分别是:
clear()
(清除):将标志对象的状态原子地更改为清除(false)test_and_set
(测试并设置):将标志对象的状态原子地更改为设置(true),并返回它先前保有的值.销毁:对象的生命周期结束时,自动调用析构函数进行销毁操作. 适合使用atomic_flag做一个自旋锁,也就是通过while
忙等
1 2 3 4 5 6 7 8 9 10 11 12 class spinlock_mutex { std::atomic_flag flag{}; public : spinlock_mutex ()noexcept = default ; void lock () noexcept { while (flag.test_and_set (std::memory_order_acquire)); } void unlock () noexcept { flag.clear (std::memory_order_release); } };
std::atomic\布尔原子类型,但比atomic_flag多load,store等方法,
1 2 3 4 std::atomic<bool > b{true }; b.load (true ); b = false ; auto value = (b = false );
exchange
以 desired 原子地替换底层值.操作为读-修改-写操作.根据 order 的值影响内存
1 2 3 4 std::atomic<bool > b{true }; bool x = b.load ();b.store (true ); x = b.exchange (false );
compare_exchange_weak
和compare_exchange_strong
原子地比较 this 和 expected 的对象表示(C++20 前),值表示 (C++20 起).如果它们逐位相等,那么以 desired 替换前者(进行读修改写操作).否则,将 this 中的实际值加载进 expected(进行加载操作).
也就是当前值与预期一致时,存储新值否则得到当前值存储在expected中
compare_exchange_weak :尝试将原子对象的当前值与预期值进行比较,如果相等则将其更新为新值并返回 true
;否则,将原子对象的值加载进 expected(进行加载操作)并返回 false
.此操作可能会由于某些硬件的特性而出现假失败 ,需要在循环中重试
1 2 3 4 std::atomic<bool > flag{ false }; bool expected = false ;while (!flag.compare_exchange_weak (expected, true ));
compare_exchange_strong :类似于 compare_exchange_weak
,但不会出现假失败,因此不需要重试 .适用于需要确保操作成功的场合.
1 2 3 4 5 6 7 8 9 10 11 12 std::atomic<bool > flag{ false }; bool expected = false ;void try_set_flag () { if (flag.compare_exchange_strong (expected, true )) { std::cout << "flag 为 false,设为 true.\n" ; } else { std::cout << "flag 为 true, expected 设为 true.\n" ; } }
compare_exchange_weak
和 compare_exchange_strong
允许指定成功和失败情况下的内存序.这意味着你可以根据成功或失败的情况,为原子操作指定不同的内存序
std::atmocstd::atomic<T*>
是一个原子指针类型,T
是指针所指向的对象类型.操作是针对 T
类型的指针进行的.虽然 std::atomic<T*>
不能被拷贝和移动,但它可以通过符合类型的指针进行构造和赋值.
除了常见的load
,store
,exchange
等,还有fetch_add
,fetch_sub
等,确保多线程下的指针操作.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 struct Foo { };Foo array[5 ]{}; std::atomic<Foo*> p{ array }; Foo* x = p.fetch_add (2 ); assert (x == array);assert (p.load () == &array[2 ]);x = (p -= 1 ); assert (x == &array[1 ]);assert (p.load () == &array[1 ]);p.fetch_add (3 , std::memory_order_release);
std::atomic\若多个执行线程不同步地同时访问同一 std::shared_ptr
对象,且任何这些访问使用了 shared_ptr 的非 const 成员函数,则将出现数据竞争, 除非通过 std::atomic<std::shared_ptr>
的实例进行所有访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 std::atomic<std::shared> data{}; void writer () { for (int i=0 ;i<10 ;++i) { std::shared_ptr<Data> new_data = std::make_shared<Data>(i); data.store (new_data); } } void reader () { for (int i = 0 ; i < 10 ; ++i) { if (auto sp = data.load ()) { std::cout << "读取线程值: " << sp->get_value () << std::endl; } else { std::cout << "没有读取到数据" << std::endl; } std::this_thread::sleep_for (10 ms); } }
最后原子类型还提供了wait
和notify_xx
的方法,wait
进行原子等待操作,如果值与this->load()值表示相同,则阻塞直到 this 被 notify_one() 或 notify_all() 提醒,如果不同直接返回. notif_xx
进行原子提醒操作,如果有线程被 this 上的原子等待操作(即 wait())阻塞,那么解除锁定这种线程;否则不做任何事
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 std::atomic<std::shared_ptr<int >> ptr = std::make_shared<int >(); void wait_for_wake_up () { std::osyncstream{ std::cout } << "线程 " << std::this_thread::get_id () << " 阻塞,等待更新唤醒\n" ; ptr.wait (ptr.load ()); std::osyncstream{ std::cout } << "线程 " << std::this_thread::get_id () << " 已被唤醒\n" ; } void wake_up () { std::this_thread::sleep_for (5 s); ptr.store (std::make_shared<int >(10 )); ptr.notify_one (); }
std::atomic_ref std::atomic_ref
类模板对它引用的对象应用原子操作.在 std::atomic_ref
对象的生存期中,认为它引用的对象是原子对象.如果一个线程写入原子对象,同时另一线程从它读取,那么其行为有良好定义.另外,对原子对象的访问可以建立线程间同步,和按 std::memory_order 所指定定序非原子内存访问.
对象的生存期必须超出所有引用该对象的 std::atomic_ref
的生存期.任何 std::atomic_ref
实例所引用的对象仍存在时,必须只通过这些 std::atomic_ref
实例排他地访问该对象.std::atomic_ref
对象所引用对象的任何子对象均不可同时被任何其他 std::atomic_ref
对象引用.
通过 std::atomic_ref
应用到对象的原子操作,相对于通过任何其他引用同一对象的 std::atomic_ref
应用的操作来说都是原子的.
1 2 3 std::atomic<std::shared_ptr<int >> ptr = std::make_shared<int >(10 ); std::atomic_ref<int > ref{ *ptr.load () }; ref = 100 ;
std::memory_order std::memory_order
指定内存访问,包括常规的非原子内存访问,如何围绕原子操作排序.在没有任何约束的多处理器系统上,多个线程同时读或写数个变量时,一个线程能观测到变量值更改的顺序不同于另一个线程写它们的顺序.实际上,更改的顺序甚至能在多个读取线程间相异.一些类似的效果还能在单处理器系统上出现,因为内存模型允许编译器进行变换.
库中所有原子操作的默认行为提供序列一致定序 .该默认行为可能有损性能,不过可以给予库的原子操作额外的 std::memory_order
实参,以指定确切的约束,在原子性外,编译器和处理器还必须强制该操作.
std::memory_order
是一个枚举类型,用来指定原子操作的内存顺序,影响这些操作的行为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 typedef enum memory_order { memory_order_relaxed, memory_order_consume, memory_order_acquire, memory_order_release, memory_order_acq_rel, memory_order_seq_cst } memory_order; enum class memory_order : { relaxed, consume, acquire, release, acq_rel, seq_cst }; inline constexpr memory_order memory_order_relaxed = memory_order::relaxed;inline constexpr memory_order memory_order_consume = memory_order::consume;inline constexpr memory_order memory_order_acquire = memory_order::acquire;inline constexpr memory_order memory_order_release = memory_order::release;inline constexpr memory_order memory_order_acq_rel = memory_order::acq_rel;inline constexpr memory_order memory_order_seq_cst = memory_order::seq_cst;
这 6 个常量,每一个常量都表示不同的内存次序
大体来说可以将它们分为三类.
memory_order_relaxed
宽松定序:不是定序约束,仅对此操作要求原子性 .
memory_order_seq_cst
序列一致定序,这是库中所有原子操作的默认行为 ,也是最严格的内存次序 ,是绝对安全 的.
参考资料 使用线程 | 现代C++并发编程教程 (mq-b.github.io) C++ Concurrency in Action -------------本文结束 感谢您的阅读-------------
感谢阅读.
打赏 微信支付