C++新标准中的并发处理方法
引言
C++20 引入了许多强大的并发编程工具, 包括 std::jthread,
std::stop_token, std::latch, std::barrier 等.
本文将介绍这些工具的使用场景和最佳实践, 更好地利用多核硬件提升程序性能和实现关注点分离.
并发模型的限制
使用多线程提升程序性能, 可扩展性受制于Amdahl定律.
定律描述了在并行计算中, 程序的加速比受限于其串行部分的比例. 公式为:
$$ S = \frac{1}{(1 - P) + \frac{P}{N}} $$
其中:
- S是加速比,
- P是程序中可并行化的部分,
- N是处理器的数量.
该定律表明, 即使增加处理器数量, 程序的加速比也会受到串行部分的限制.
| 并行化比例P | 处理器数量N | 加速比 S |
|---|---|---|
| 0.5 (50%) | 1 | 1.00 |
| 0.5 (50%) | 2 | 1.33 |
| 0.5 (50%) | 4 | 1.60 |
| 0.5 (50%) | 8 | 1.78 |
| 0.5 (50%) | 16 | 1.88 |
| 0.9 (90%) | 1 | 1.00 |
| 0.9 (90%) | 2 | 1.82 |
| 0.9 (90%) | 4 | 3.08 |
| 0.9 (90%) | 8 | 4.71 |
| 0.9 (90%) | 16 | 6.40 |
| 0.99 (99%) | 1 | 1.00 |
| 0.99 (99%) | 2 | 1.98 |
| 0.99 (99%) | 4 | 3.88 |
| 0.99 (99%) | 8 | 7.48 |
| 0.99 (99%) | 16 | 13.91 |
分析
- 低并行化比例(如 P = 0.5 ): 即使增加处理器数量, 加速比提升有限, 因为串行部分占比较大.
- 高并行化比例(如 P = 0.9 ) 或 ( P = 0.99 ): 加速比显著提升, 但随着处理器数量增加, 加速比增速减缓, 最终受限于串行部分.
结论
即使有大量处理器, 程序的加速比仍受限于其串行部分. 因此, 优化串行部分对提升整体性能至关重要.
并发模型
- 并行算法: 适用于处理大量数据, 且数据处理操作可以彼此独立地执行.
- 线程池: 适用于将工作分解成独立的任务, 并高效利用多个 CPU 核心.
- 专用线程: 适用于实现关注点分离, 例如处理长时间运行的后台任务.
线程管理
std::jthread
std::jthread 是 C++20 中推荐的线程管理类, 相比 std::thread
有以下优势:
- 自动资源管理: 析构函数会自动请求线程停止并等待线程结束.
- 协作式取消: 与 std::stop~token~ 紧密集成, 支持优雅的线程取消.
以下是一个使用 std::jthread 的简单示例,
展示了其自动资源管理和协作式取消的特性:
#include <iostream>
#include <thread>
#include <chrono>
void worker(std::stop_token stopToken) {
while (!stopToken.stop_requested()) {
std::cout << "Working..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Thread stopped." << std::endl;
}
int main() {
std::jthread t(worker); // 自动启动线程
std::this_thread::sleep_for(std::chrono::seconds(3)); // 主线程等待3秒
// 不需要显式调用 join(), jthread 析构时会自动请求停止并等待线程结束
return 0;
}
#+results[4082202189785567a8c1a6ce1c8c54e0a61a8fc7]:
Working...
Working...
Working...
Thread stopped.
- 自动资源管理:
std::jthread在析构时会自动请求线程停止并等待线程结束, 无需手动调用join(). - 协作式取消: 通过
std::stop_token, 线程可以检查是否被请求停止, 从而实现优雅的线程取消.
在这个例子中, 主线程等待3秒后, std::jthread 对象 t
析构时会自动请求线程停止, 并等待线程结束.
std::stop_token
std::stop_token 用于实现协作式线程取消, 线程可以定期检查 stop~token~
以确定是否收到停止请求.
std::stop_token 是 C++20 引入的一个工具, 用于协作式线程取消.
它允许线程定期检查是否收到了停止请求. 以下是一个简单的例子, 展示如何使用
std::stop_token 和 std::jthread 来实现线程的协作式取消:
#include <iostream>
#include <thread>
#include <stop_token>
#include <chrono>
void worker(std::stop_token stop_token) {
while (!stop_token.stop_requested()) {
std::cout << "Working..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Stopped!" << std::endl;
}
int main() {
std::jthread t(worker); // jthread 自动管理线程的生命周期
std::this_thread::sleep_for(std::chrono::seconds(5));
t.request_stop(); // 请求停止线程
// jthread 在析构时会自动 join
return 0;
}
#+results[15ee58c676e8e3e26173d938870bdfe8c016a0f2]:
Working...
Working...
Working...
Working...
Working...
Stopped!
std::jthread类似于std::thread, 但它在析构时会自动调用join(), 并且支持std::stop_token.std::stop_token: 用于检查是否收到了停止请求.request_stop(): 请求线程停止执行.
在这个例子中, worker 函数会每秒打印一次 "Working…", 直到收到停止请求.
主线程在 5 秒后调用 request_stop(), worker 线程检测到停止请求后退出.
线程同步
std::latch
std::latch 用于线程间的一次性同步,
适用于等待多个线程完成特定操作后再继续执行.
#include <iostream>
#include <thread>
#include <latch>
int main() {
std::latch latch(3); // 创建一个初始计数值为 3 的 Latch 对象
auto thread1 = std::jthread([&latch]() {
std::cout << "线程 1 完成任务" << std::endl;
latch.count_down();
});
auto thread2 = std::jthread([&latch]() {
std::cout << "线程 2 完成任务" << std::endl;
latch.count_down();
});
auto thread3 = std::jthread([&latch]() {
std::cout << "线程 3 完成任务" << std::endl;
latch.count_down();
});
latch.wait();
std::cout << "所有线程完成任务" << std::endl;
return 0;
}
#+results[1ceecf8905ca9d41b0294476224df949a2e9940c]:
线程 1 完成任务
线程 2 完成任务线程 3 完成任务
所有线程完成任务
std::barrier
std::barrier 用于协调多个线程的同步, 适用于循环迭代或分阶段处理.
#include <iostream>
#include <thread>
#include <barrier>
#include <vector>
int main() {
const int num_threads = 3;
std::barrier barrier(num_threads, []() {
std::cout << "所有线程已完成当前阶段的任务" << std::endl;
});
// 先把所有线程对象存起来, 循环完了再让它们析构
std::vector<std::jthread> threads;
threads.reserve(num_threads);
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back([&barrier, i]() {
for (int j = 0; j < 3; ++j) {
std::cout << "线程 " << i << " 完成阶段 " << j << " 的任务" << std::endl;
barrier.arrive_and_wait();
}
});
}
// main函数return前, threads里的jthread才会陆续析构,
// 每个析构时会发stop_request并join, 届时线程已全部执行完成.
return 0;
}
#+results[61506ddb78395f693768cc6123ed5abb1d5379bb]:
线程 线程 1 完成阶段 00 完成阶段 0 的任务
的任务
线程 2 完成阶段 0 的任务
所有线程已完成当前阶段的任务
线程 2 完成阶段 1线程 0 完成阶段 1 的任务线程
的任务1 完成阶段 1 的任务
所有线程已完成当前阶段的任务
线程 1 完成阶段 2 的任务
线程 2 完成阶段 2 的任务
线程 0 完成阶段 2 的任务
所有线程已完成当前阶段的任务
std::barrier 是 C++20 引入的同步原语, 用于协调多个线程的同步.
在这个例子中, std::barrier 会确保所有线程在完成当前阶段的任务后,
才会继续执行下一阶段的任务.
代码的执行流程如下:
- 创建 3 个线程, 每个线程执行 3 个阶段的任务.
- 在每个阶段, 线程完成任务后会调用
barrier.arrive_and_wait(), 等待其他线程也完成当前阶段的任务. - 当所有线程都到达屏障时, 会执行屏障的回调函数, 输出 "所有线程已完成当前阶段的任务".
- 然后所有线程继续执行下一阶段的任务, 直到所有阶段完成.
std::future
std::future 用于管理异步操作的结果,
适用于从异步任务中获取结果或异常信息.
#include <iostream>
#include <future>
#include <thread>
int main() {
std::promise<int> promise;
std::future<int> future = promise.get_future();
std::thread thread([&promise]() {
int data = 10;
promise.set_value(data);
});
int result = future.get();
std::cout << "结果: " << result << std::endl;
thread.join();
return 0;
}
#+results[2d011822d5af327927ae887bce8c8488078e319e]:
结果: 10
std::condition_variable
std::condition_variable 用于线程间同步, 适用于等待特定条件的发生.
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <optional>
std::mutex mtx;
std::condition_variable cv;
std::optional<int> data;
void producer() {
int value = 10;
{
std::lock_guard<std::mutex> lock(mtx);
data = value;
}
cv.notify_one();
}
void consumer() {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return data.has_value(); });
std::cout << "数据: " << *data << std::endl;
}
int main() {
std::thread producer_thread(producer);
std::thread consumer_thread(consumer);
producer_thread.join();
consumer_thread.join();
return 0;
}
#+results[67148ef8c6d93517cb923cc3623ca3383cdc1d8b]:
数据: 10
总结
C++20 提供了丰富的并发编程工具, 通过选择合适的并发模型和合理管理线程, 可以有效地利用多核硬件提升程序性能或实现关注点分离. 建议优先使用更高级的同步原语, 例如并行算法, 线程池, 期值, 闩锁和屏障, 因为它们更易于使用和维护.