引言
线程屏障(Thread Barrier)是一种同步机制,用于让一组线程在某个点上等待,直到所有线程都到达该点后再继续执行。屏障通常用于多线程编程中,确保所有线程在某个阶段完成后再进入下一个阶段。
std::latch
在了解barrier之间,让我们先来看一看std::latch吧。
-
构造函数explicit latch(std::ptrdiff_t expected); 传入一个参数,指定计数器初始的值
-
成员函数 count_down(std::ptrdiff_t n = 1); 每次执行让计数器减去n,默认为1
-
成员函数 wait(); 等待计数器减为0
-
成员函数 try_wait(); 非阻塞检查计数器是否减为0,为0则返回true,极少数情况下也会返回false
-
在多线程环境中,如果多个线程同时操作 latch 的计数器,可能会出现竞争条件。例如:
- 线程 A 调用
count_down() 将计数器减到 0。
- 线程 B 在计数器减到 0 的瞬间调用
try_wait(),但由于线程调度或内存可见性问题,线程 B 看到的计数器值可能仍然不为 0。
这种情况下,try_wait 可能会返回 false,即使计数器实际上已经减到 0。
-
代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
#include <iostream>
#include <latch>
#include <thread>
#include <vector>
#include <chrono>
#include <random>
void func(std::latch& latch) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1, 10);
std::this_thread::sleep_for(std::chrono::seconds(dis(gen)));
std::cout << " finish work " << std::endl;
latch.count_down();
}
int main() {
std::latch latch(3);
std::vector<std::thread> threads;
for (int i = 0; i < 3; i++) {
threads.emplace_back(func, std::ref(latch));
}
latch.wait(); // 会等待latch减为0,才会继续运行下去
std::cout << "all work is done" << std::endl;
for (auto& thread : threads) {
thread.join();
}
return 0;
}
|
std::barrier
std::barrier 是 C++20 中引入的一个同步原语,用于让一组线程在某个点上等待,直到所有线程都到达该点后再继续执行。与 std::latch 不同,std::barrier 是 可重用的,适用于多阶段任务的同步。
| 类别 |
函数签名 |
描述 |
| 构造函数 |
explicit barrier(std::ptrdiff_t count); |
构造一个 barrier,指定需要等待的线程数 count。 |
|
template<class CompletionFunction> barrier(std::ptrdiff_t count, CompletionFunction&& completion); |
构造一个 barrier,指定线程数 count 和完成回调函数 completion。 |
| 成员函数 |
arrival_token arrive( std::ptrdiff_t n = 1 ); |
通知 barrier 当前线程已到达,update 指定减少的计数(默认 1)。 |
|
void wait( arrival_token&& arrival ) const; |
等待计数器减为0 |
|
void arrive_and_wait(); |
通知 barrier 当前线程已到达,并阻塞直到所有线程到达。 |
|
void arrive_and_drop(); |
通知 barrier 当前线程已到达,并减少 barrier 的计数(线程退出)。 |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
#include <iostream>
#include <thread>
#include <vector>
#include <barrier>
#include <latch>
#include <chrono>
#include <random>
void func(std::barrier<> & barrier) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1, 10);
std::this_thread::sleep_for(std::chrono::seconds(dis(gen)));
std::cout << " start work" << std::endl;
barrier.arrive_and_wait();
std::this_thread::sleep_for(std::chrono::seconds(dis(gen)));
std::cout << " end work" << std::endl;
barrier.arrive_and_wait();
}
int main() {
std::barrier<> barrier(4);
std::vector<std::thread> threads;
for (int i = 0; i < 3; i++) {
threads.emplace_back(func, std::ref(barrier));
}
barrier.arrive_and_wait();
std::cout << "all people ready" << std::endl;
barrier.wait(barrier.arrive()); // 相当于arrive_and_wait()
std::cout << "all work is done" << std::endl;
for (auto& thread : threads) {
thread.join();
}
return 0;
}
|