Featured image of post 线程屏障

线程屏障

线程屏障

引言

线程屏障(Thread Barrier)是一种同步机制,用于让一组线程在某个点上等待,直到所有线程都到达该点后再继续执行。屏障通常用于多线程编程中,确保所有线程在某个阶段完成后再进入下一个阶段。

std::latch

​ 在了解barrier之间,让我们先来看一看std::latch吧。

  • 构造函数explicit latch(std::ptrdiff_t expected); 传入一个参数,指定计数器初始的值

  • 成员函数 count_down(std::ptrdiff_t n = 1); 每次执行让计数器减去n,默认为1

  • 成员函数 wait(); 等待计数器减为0

  • 成员函数 try_wait(); 非阻塞检查计数器是否减为0,为0则返回true,极少数情况下也会返回false

    • 在多线程环境中,如果多个线程同时操作 latch 的计数器,可能会出现竞争条件。例如:

      • 线程 A 调用 count_down() 将计数器减到 0。
      • 线程 B 在计数器减到 0 的瞬间调用 try_wait(),但由于线程调度或内存可见性问题,线程 B 看到的计数器值可能仍然不为 0。

      这种情况下,try_wait 可能会返回 false,即使计数器实际上已经减到 0。

  • 代码示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <iostream>
#include <latch>
#include <thread>
#include <vector>
#include <chrono>
#include <random>

void func(std::latch& latch) {
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(1, 10);
    std::this_thread::sleep_for(std::chrono::seconds(dis(gen)));
    std::cout << " finish work " << std::endl;
    latch.count_down();
}

int main() {
    std::latch latch(3);

    std::vector<std::thread> threads;
    for (int i = 0; i < 3; i++) {
        threads.emplace_back(func, std::ref(latch));
    }

    latch.wait();		// 会等待latch减为0,才会继续运行下去
    std::cout << "all work is done" << std::endl;
    for (auto& thread : threads) {
        thread.join();
    }

    return 0;
}

std::barrier

std::barrier 是 C++20 中引入的一个同步原语,用于让一组线程在某个点上等待,直到所有线程都到达该点后再继续执行。与 std::latch 不同,std::barrier可重用的,适用于多阶段任务的同步。

类别 函数签名 描述
构造函数 explicit barrier(std::ptrdiff_t count); 构造一个 barrier,指定需要等待的线程数 count
template<class CompletionFunction> barrier(std::ptrdiff_t count, CompletionFunction&& completion); 构造一个 barrier,指定线程数 count 和完成回调函数 completion
成员函数 arrival_token arrive( std::ptrdiff_t n = 1 ); 通知 barrier 当前线程已到达,update 指定减少的计数(默认 1)。
void wait( arrival_token&& arrival ) const; 等待计数器减为0
void arrive_and_wait(); 通知 barrier 当前线程已到达,并阻塞直到所有线程到达。
void arrive_and_drop(); 通知 barrier 当前线程已到达,并减少 barrier 的计数(线程退出)。
  • 代码案例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <iostream>
#include <thread>
#include <vector>
#include <barrier>
#include <latch>

#include <chrono>
#include <random>

void func(std::barrier<> & barrier) {
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(1, 10);
    std::this_thread::sleep_for(std::chrono::seconds(dis(gen)));
    std::cout << " start work" << std::endl;
    barrier.arrive_and_wait();

    std::this_thread::sleep_for(std::chrono::seconds(dis(gen)));
    std::cout << " end work" << std::endl;
    barrier.arrive_and_wait();
}

int main() {
    std::barrier<> barrier(4);
    std::vector<std::thread> threads;
    for (int i = 0; i < 3; i++) {
        threads.emplace_back(func, std::ref(barrier));
    }
    barrier.arrive_and_wait();
    std::cout << "all people ready" << std::endl;


    barrier.wait(barrier.arrive());		// 相当于arrive_and_wait()
    std::cout << "all work is done" << std::endl;
    for (auto& thread : threads) {
        thread.join();
    }

    return 0;
}