C++并发编程(IV)
Posted on Sun 28 February 2021 in programming
条件变量
在C++中,条件变量是一种同步原语,用于在多线程编程环境中实现线程之间的同步。使用条件变量,一个线程可以使自身进入等待状态,直到另一个线程发出某种信号,这通常是表示某种条件已经变为真。
C++中的条件变量通常与互斥量(mutex)一起使用,以确保在检查条件和进入等待状态之间不存在竞态条件。通常情况下,用法如下:
-
锁定互斥量来保护条件的检查。
-
使用std::condition_variable对象的wait()方法使线程等待。这将阻塞当前线程,直到另一个线程调用notify_one()或notify_all()来唤醒至少一个等待的线程。
-
一旦线程被唤醒,它将重新锁定互斥量并检查条件。如果条件不满足,线程可能会再次等待。
通过这种方式,条件变量在以下几个方面很有用:
· 帮助避免忙等待,等待的线程会被挂起,不会占用处理器时间。 · 允许线程有选择地在特定条件下执行。 · 允许多个线程同步它们对数据的访问。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void print_ready() {
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, []{return ready;}); // 阻塞当前线程,直到ready为true
std::cout << "ready!" << std::endl;
}
void set_ready() {
std::unique_lock<std::mutex> lck(mtx);
ready = true;
cv.notify_one(); // 唤醒一个等待的线程
}
int main() {
std::thread t1(print_ready);
std::thread t2(set_ready);
t1.join();
t2.join();
return 0;
}
#include <queue>
#include <chrono>
#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>
int main() {
std::queue<int> produced_nums;
std::mutex mtx;
std::condition_variable cv;
bool notified = false; // 通知信号
// 生产者
auto producer = [&]() {
for (int i = 0; ; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(900));
std::unique_lock<std::mutex> lock(mtx);
std::cout << "producing " << i << std::endl;
produced_nums.push(i);
notified = true;
cv.notify_all(); // 此处也可以使用 notify_one
}
};
// 消费者
auto consumer = [&]() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
while (!notified) { // 避免虚假唤醒
cv.wait(lock);
}
// 短暂取消锁,使得生产者有机会在消费者消费空前继续生产
lock.unlock();
// 消费者慢于生产者
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
lock.lock();
while (!produced_nums.empty()) {
std::cout << "consuming " << produced_nums.front() << std::endl;
produced_nums.pop();
}
notified = false;
}
};
// 分别在不同的线程中运行
std::thread p(producer);
std::thread cs[2];
for (int i = 0; i < 2; ++i) {
cs[i] = std::thread(consumer);
}
p.join();
for (int i = 0; i < 2; ++i) {
cs[i].join();
}
return 0;
}
原子操作与内存模型
在 C++ 中,原子操作是一种特殊类型的操作,它们以不可中断的方式执行。这意味着原子操作在执行过程中不会被其他线程干扰。如果多个线程同时尝试执行原子操作(在同一对象上),则只有一个线程会成功,并且结果是确定的。这使得原子操作成为了线程同步和数据一致性的关键工具。
为了支持这种行为,C++11 标准库引入了
· 比较和交换(compare-and-swap) · 增加(fetch-and-add) · 减少(fetch-and-sub) · 存储和加载等
#include <atomic>
#include <iostream>
std::atomic<int> counter(0);
void increment() {
++counter;
}
int main() {
increment();
increment();
std::cout << counter << '\n'; // 输出: 2
}
在这个例子中,counter是一个原子整数。每次当increment()函数被调用时,计数器都会增加 1,但不用担心线程安全问题。只有一个线程可以增加计数器,其他线程会等待,直到操作完成。最后,主线程打印计数器的值。
注意,在一些情况下,原子操作可能带来一些性能开销,这是因为为了保证操作的原子性,可能需要硬件和/或编译器提供额外的支持。所以,当使用原子操作时,应尽量减少并发冲突,合理使用。
内存顺序
- 宽松模型:在此模型下,单个线程内的原子操作都是顺序执行的,不允许指令重排,但不同线程间原子操作的顺序是任意的。类型通过 std::memory_order_relaxed 指定。我们来看一个例子:
std::atomic<int> counter = {0};
std::vector<std::thread> vt;
for (int i = 0; i < 100; ++i) {
vt.emplace_back([&](){
counter.fetch_add(1, std::memory_order_relaxed);
});
}
for (auto& t : vt) {
t.join();
}
std::cout << "current counter:" << counter << std::endl;
- 释放/消费模型:
在C++的原子操作中,我们常常会看到”memory_order”的概念。memory_order用于指定原子操作读取和写入的内存顺序。其中,
memory_order_consume是其中一种类型。
memory_order_consume是一种更为松散的内存顺序,它只保证对依赖于原子变量a的普通非原子操作b的顺序:如果b依赖于a,那么在a后面出现的b操作,不会被重排到a之前。也就是说,如果有两个操作A和B,其中B依赖于A的结果,那么B不能在A之前执行。
然而,从C++17起,出于复杂性和可移植性考虑,memory_order_consume已被几乎所有的编译器实现降级为memory_order_acquire,恢复到了严格的内存顺序。
在C++中,不建议直接使用memory_order_consume,推荐使用memory_order_acquire和memory_order_release来对原子变量进行更严格的同步。
- 释放/获取模型:在此模型下,我们可以进一步加紧对不同线程间原子操作的顺序的限制,在释放
std::memory_order_release和获取std::memory_order_acquire之间规定时序,即发生在释放(release)操作之前的所有写操作,对其他线程的任何获取(acquire)操作都是可见的,亦即发生顺序(happens-before)。
memory_order_acquire是一种防止”读”操作的重排。它常常被用在消费者线程中,用来读取生产者线程中原子变量的最新值,并确保在这之后的所有操作都能看到最新的值。
让我们通过一个简单的例子详细解释下: 假设我们在做一个简单的邮件传递的系统。我们有一个处理数据(”邮件”)的生产者线程和一个消费者线程,生产者在数据完成后触发信号。
std::atomic<bool> data_ready(false); // 原子变量作为触发信号
std::string data; // 数据(本例中的"邮件")
void produce_data()
{
data = "Hello World!"; // 填写邮件
data_ready.store(true, std::memory_order_release); // 触发信号,邮件已经准备好
}
void consume_data()
{
while(!data_ready.load(std::memory_order_acquire)); // 等待邮件
// 这时,我们确信 data(邮件)现在已经准备好了
std::cout << "Got mail: " << data << std::endl;
//data_ready.store(false, std::memory_order_release); // 重置条件
}
在这个示例中,生产者线程调用produce_data函数,填写数据,并在完成后设置data_ready标识为true。消费者线程在consume_data函数中持续等待,直到data_ready为true。 直到消费者看到data_ready为true,它将开始处理数据,然后可能将信号重置为false以等待下一个信号。
memory_order_acquire 确保了:一旦消费者线程看到 data_ready 为 true,那么 data 的值必定是 "Hello World!",即使这两个操作在不同的线程中进行。
// a kind of fix-num thread pool
#include <vector>
#include <queue>
#include <thread>
#include <future>
#include <functional>
#include <utility>
#include <iostream>
/*
ThreadPool p(4); // 指定四个工作线程
// 将任务在池中入队,并返回一个 std::future
auto f = pool.enqueue([](int life) {
return meaning;
}, 42);
// 从 future 中获得执行结果
std::cout << f.get() << std::endl;
*/
class ThreadPool {
public:
ThreadPool(int num_threads) {
for (size_t i=0; i<num_threads; ++i) {
this->threads.emplace_back(
[this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->mtx);
this->cv.wait(lock, [this]{
return this->stop || !this->tasks.empty();
});
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
}
template <class F, class... Args>
auto enqueue(F&& f, Args&& ...args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(mtx);
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]{ (*task)(); });
}
cv.notify_one();
return res;
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(mtx);
stop = true;
}
cv.notify_all();
for (std::thread &worker: threads)
worker.join();
}
private:
std::vector<std::thread> threads;
std::queue<std::function<void()>> tasks;
std::mutex mtx;
std::condition_variable cv;
bool stop;
};
int main() {
ThreadPool pool(4); // 创建一个包含四个线程的线程池
// 创建一个任务
auto task = [](int x) {
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时操作
return x*x;
};
// 将任务加入到线程池,并获取一个 future 来获取结果
std::future<int> result = pool.enqueue(task, 10);
// 获取任务的结果
std::cout << result.get() << std::endl; // 输出 100
return 0;
}
更多的练习:
练习1:添加一个将线程池的大小动态调整的函数到上述线程池类。这个函数应该可以增加或减少线程池中线程的数量,但请注意在减少线程数量时,你可能需要思考一下如何处理正在执行任务的线程。 练习2:添加一个取消特定任务的功能到你的线程池类。你可能需要给每个任务一个唯一的ID,然后提供一个使用这个ID来取消任务的功能。 练习3:在你的线程池中实现优先级队列,而不是标准队列,以允许某些任务比其他任务有更高的执行优先级。你可能需要使用一个以priority_queue来替代现有的任务队列。 练习4:添加错误处理机制,确保当一个任务抛出了一个未捕获的异常时,线程池可以正常工作。 练习5:添加对返回结果的支持,如支持任务返回一个自定义的类或者结构。
Reference: 1. (现代C++教程)[https://changkun.de/modern-cpp/zh-cn/07-thread/#7-3-%E6%9C%9F%E7%89%A9]