December 15, 2024
By: Kevin

C++新标准中的并发处理方法

  1. 引言
  2. 并发模型的限制
    1. 并发模型
  3. 线程管理
    1. std::jthread
    2. std::stop_token
  4. 线程同步
    1. std::latch
    2. std::barrier
    3. std::future
    4. std::condition_variable
  5. 总结

引言

C++20 引入了许多强大的并发编程工具, 包括 std::jthread, std::stop_token, std::latch, std::barrier 等.

本文将介绍这些工具的使用场景和最佳实践, 更好地利用多核硬件提升程序性能和实现关注点分离.

并发模型的限制

使用多线程提升程序性能, 可扩展性受制于Amdahl定律.

定律描述了在并行计算中, 程序的加速比受限于其串行部分的比例. 公式为:

$$ S = \frac{1}{(1 - P) + \frac{P}{N}} $$

其中:

  • S是加速比,
  • P是程序中可并行化的部分,
  • N是处理器的数量.

该定律表明, 即使增加处理器数量, 程序的加速比也会受到串行部分的限制.

并行化比例P处理器数量N加速比 S
0.5 (50%)11.00
0.5 (50%)21.33
0.5 (50%)41.60
0.5 (50%)81.78
0.5 (50%)161.88
0.9 (90%)11.00
0.9 (90%)21.82
0.9 (90%)43.08
0.9 (90%)84.71
0.9 (90%)166.40
0.99 (99%)11.00
0.99 (99%)21.98
0.99 (99%)43.88
0.99 (99%)87.48
0.99 (99%)1613.91

分析

  1. 低并行化比例(如 P = 0.5 ): 即使增加处理器数量, 加速比提升有限, 因为串行部分占比较大.
  2. 高并行化比例(如 P = 0.9 ) 或 ( P = 0.99 ): 加速比显著提升, 但随着处理器数量增加, 加速比增速减缓, 最终受限于串行部分.

结论

即使有大量处理器, 程序的加速比仍受限于其串行部分. 因此, 优化串行部分对提升整体性能至关重要.

并发模型

  • 并行算法: 适用于处理大量数据, 且数据处理操作可以彼此独立地执行.
  • 线程池: 适用于将工作分解成独立的任务, 并高效利用多个 CPU 核心.
  • 专用线程: 适用于实现关注点分离, 例如处理长时间运行的后台任务.

线程管理

std::jthread

std::jthread 是 C++20 中推荐的线程管理类, 相比 std::thread 有以下优势:

  • 自动资源管理: 析构函数会自动请求线程停止并等待线程结束.
  • 协作式取消: 与 std::stop~token~ 紧密集成, 支持优雅的线程取消.

以下是一个使用 std::jthread 的简单示例, 展示了其自动资源管理和协作式取消的特性:

#include <iostream>
#include <thread>
#include <chrono>

void worker(std::stop_token stopToken) {
    while (!stopToken.stop_requested()) {
        std::cout << "Working..." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    std::cout << "Thread stopped." << std::endl;
}

int main() {
    std::jthread t(worker);  // 自动启动线程

    std::this_thread::sleep_for(std::chrono::seconds(3));  // 主线程等待3秒

    // 不需要显式调用 join(), jthread 析构时会自动请求停止并等待线程结束
    return 0;
}
#+results[4082202189785567a8c1a6ce1c8c54e0a61a8fc7]: 
Working...
Working...
Working...
Thread stopped.
  1. 自动资源管理: std::jthread 在析构时会自动请求线程停止并等待线程结束, 无需手动调用 join().
  2. 协作式取消: 通过 std::stop_token, 线程可以检查是否被请求停止, 从而实现优雅的线程取消.

在这个例子中, 主线程等待3秒后, std::jthread 对象 t 析构时会自动请求线程停止, 并等待线程结束.

std::stop_token

std::stop_token 用于实现协作式线程取消, 线程可以定期检查 stop~token~ 以确定是否收到停止请求.

std::stop_token 是 C++20 引入的一个工具, 用于协作式线程取消. 它允许线程定期检查是否收到了停止请求. 以下是一个简单的例子, 展示如何使用 std::stop_tokenstd::jthread 来实现线程的协作式取消:

#include <iostream>
#include <thread>
#include <stop_token>
#include <chrono>

void worker(std::stop_token stop_token) {
    while (!stop_token.stop_requested()) {
        std::cout << "Working..." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    std::cout << "Stopped!" << std::endl;
}

int main() {
    std::jthread t(worker);  // jthread 自动管理线程的生命周期

    std::this_thread::sleep_for(std::chrono::seconds(5));
    t.request_stop();  // 请求停止线程

    // jthread 在析构时会自动 join
    return 0;
}
#+results[15ee58c676e8e3e26173d938870bdfe8c016a0f2]: 
Working...
Working...
Working...
Working...
Working...
Stopped!
  1. std::jthread 类似于 std::thread, 但它在析构时会自动调用 join(), 并且支持 std::stop_token.
  2. std::stop_token : 用于检查是否收到了停止请求.
  3. request_stop(): 请求线程停止执行.

在这个例子中, worker 函数会每秒打印一次 "Working…", 直到收到停止请求. 主线程在 5 秒后调用 request_stop(), worker 线程检测到停止请求后退出.

线程同步

std::latch

std::latch 用于线程间的一次性同步, 适用于等待多个线程完成特定操作后再继续执行.

#include <iostream>
#include <thread>
#include <latch>

int main() {
  std::latch latch(3); // 创建一个初始计数值为 3 的 Latch 对象

  auto thread1 = std::jthread([&latch]() {
    std::cout << "线程 1 完成任务" << std::endl;
    latch.count_down();
  });

  auto thread2 = std::jthread([&latch]() {
    std::cout << "线程 2 完成任务" << std::endl;
    latch.count_down();
  });

  auto thread3 = std::jthread([&latch]() {
    std::cout << "线程 3 完成任务" << std::endl;
    latch.count_down();
  });

  latch.wait();
  std::cout << "所有线程完成任务" << std::endl;

  return 0;
}
#+results[1ceecf8905ca9d41b0294476224df949a2e9940c]: 
线程 1 完成任务
线程 2 完成任务线程 3 完成任务

所有线程完成任务

std::barrier

std::barrier 用于协调多个线程的同步, 适用于循环迭代或分阶段处理.

#include <iostream>
#include <thread>
#include <barrier>
#include <vector>

int main() {
    const int num_threads = 3;
    std::barrier barrier(num_threads, []() {
        std::cout << "所有线程已完成当前阶段的任务" << std::endl;
    });

    // 先把所有线程对象存起来, 循环完了再让它们析构
    std::vector<std::jthread> threads;
    threads.reserve(num_threads);

    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back([&barrier, i]() {
            for (int j = 0; j < 3; ++j) {
                std::cout << "线程 " << i << " 完成阶段 " << j << " 的任务" << std::endl;
                barrier.arrive_and_wait();
            }
        });
    }

    // main函数return前, threads里的jthread才会陆续析构,
    // 每个析构时会发stop_request并join, 届时线程已全部执行完成.
    return 0;
}
#+results[61506ddb78395f693768cc6123ed5abb1d5379bb]: 
线程 线程 1 完成阶段 00 完成阶段 0 的任务
 的任务
线程 2 完成阶段 0 的任务
所有线程已完成当前阶段的任务
线程 2 完成阶段 1线程 0 完成阶段 1 的任务线程
 的任务1 完成阶段 1 的任务

所有线程已完成当前阶段的任务
线程 1 完成阶段 2 的任务
线程 2 完成阶段 2 的任务
线程 0 完成阶段 2 的任务
所有线程已完成当前阶段的任务

std::barrier 是 C++20 引入的同步原语, 用于协调多个线程的同步. 在这个例子中, std::barrier 会确保所有线程在完成当前阶段的任务后, 才会继续执行下一阶段的任务.

代码的执行流程如下:

  1. 创建 3 个线程, 每个线程执行 3 个阶段的任务.
  2. 在每个阶段, 线程完成任务后会调用 barrier.arrive_and_wait(), 等待其他线程也完成当前阶段的任务.
  3. 当所有线程都到达屏障时, 会执行屏障的回调函数, 输出 "所有线程已完成当前阶段的任务".
  4. 然后所有线程继续执行下一阶段的任务, 直到所有阶段完成.

std::future

std::future 用于管理异步操作的结果, 适用于从异步任务中获取结果或异常信息.

#include <iostream>
#include <future>
#include <thread>

int main() {
  std::promise<int> promise;
  std::future<int> future = promise.get_future();

  std::thread thread([&promise]() {
    int data = 10;
    promise.set_value(data);
  });

  int result = future.get();
  std::cout << "结果: " << result << std::endl;

  thread.join();
  return 0;
}
#+results[2d011822d5af327927ae887bce8c8488078e319e]: 
结果: 10

std::condition_variable

std::condition_variable 用于线程间同步, 适用于等待特定条件的发生.

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <optional>

std::mutex mtx;
std::condition_variable cv;
std::optional<int> data;

void producer() {
  int value = 10;
  {
    std::lock_guard<std::mutex> lock(mtx);
    data = value;
  }
  cv.notify_one();
}

void consumer() {
  std::unique_lock<std::mutex> lock(mtx);
  cv.wait(lock, [] { return data.has_value(); });
  std::cout << "数据: " << *data << std::endl;
}

int main() {
  std::thread producer_thread(producer);
  std::thread consumer_thread(consumer);

  producer_thread.join();
  consumer_thread.join();

  return 0;
}
#+results[67148ef8c6d93517cb923cc3623ca3383cdc1d8b]: 
数据: 10

总结

C++20 提供了丰富的并发编程工具, 通过选择合适的并发模型和合理管理线程, 可以有效地利用多核硬件提升程序性能或实现关注点分离. 建议优先使用更高级的同步原语, 例如并行算法, 线程池, 期值, 闩锁和屏障, 因为它们更易于使用和维护.

Tags: c++