C++并发编程(IV)

Posted on Sun 28 February 2021 in programming

条件变量

在C++中,条件变量是一种同步原语,用于在多线程编程环境中实现线程之间的同步。使用条件变量,一个线程可以使自身进入等待状态,直到另一个线程发出某种信号,这通常是表示某种条件已经变为真。

C++中的条件变量通常与互斥量(mutex)一起使用,以确保在检查条件和进入等待状态之间不存在竞态条件。通常情况下,用法如下:

  1. 锁定互斥量来保护条件的检查。

  2. 使用std::condition_variable对象的wait()方法使线程等待。这将阻塞当前线程,直到另一个线程调用notify_one()或notify_all()来唤醒至少一个等待的线程。

  3. 一旦线程被唤醒,它将重新锁定互斥量并检查条件。如果条件不满足,线程可能会再次等待。

通过这种方式,条件变量在以下几个方面很有用:

· 帮助避免忙等待,等待的线程会被挂起,不会占用处理器时间。 · 允许线程有选择地在特定条件下执行。 · 允许多个线程同步它们对数据的访问。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void print_ready() {
    std::unique_lock<std::mutex> lck(mtx);
    cv.wait(lck, []{return ready;});  // 阻塞当前线程,直到ready为true
    std::cout << "ready!" << std::endl;
}

void set_ready() {
    std::unique_lock<std::mutex> lck(mtx);
    ready = true;
    cv.notify_one();  // 唤醒一个等待的线程
}

int main() {
    std::thread t1(print_ready);
    std::thread t2(set_ready);
    t1.join();
    t2.join();
    return 0;
}
#include <queue>
#include <chrono>
#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>


int main() {
    std::queue<int> produced_nums;
    std::mutex mtx;
    std::condition_variable cv;
    bool notified = false;  // 通知信号

    // 生产者
    auto producer = [&]() {
        for (int i = 0; ; i++) {
            std::this_thread::sleep_for(std::chrono::milliseconds(900));
            std::unique_lock<std::mutex> lock(mtx);
            std::cout << "producing " << i << std::endl;
            produced_nums.push(i);
            notified = true;
            cv.notify_all(); // 此处也可以使用 notify_one
        }
    };
    // 消费者
    auto consumer = [&]() {
        while (true) {
            std::unique_lock<std::mutex> lock(mtx);
            while (!notified) {  // 避免虚假唤醒
                cv.wait(lock);
            }
            // 短暂取消锁,使得生产者有机会在消费者消费空前继续生产
            lock.unlock();
            // 消费者慢于生产者
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
            lock.lock();
            while (!produced_nums.empty()) {
                std::cout << "consuming " << produced_nums.front() << std::endl;
                produced_nums.pop();
            }
            notified = false;
        }
    };

    // 分别在不同的线程中运行
    std::thread p(producer);
    std::thread cs[2];
    for (int i = 0; i < 2; ++i) {
        cs[i] = std::thread(consumer);
    }
    p.join();
    for (int i = 0; i < 2; ++i) {
        cs[i].join();
    }
    return 0;
}

原子操作与内存模型

在 C++ 中,原子操作是一种特殊类型的操作,它们以不可中断的方式执行。这意味着原子操作在执行过程中不会被其他线程干扰。如果多个线程同时尝试执行原子操作(在同一对象上),则只有一个线程会成功,并且结果是确定的。这使得原子操作成为了线程同步和数据一致性的关键工具。

为了支持这种行为,C++11 标准库引入了  头文件,该文件提供了一组类模板和函数模板,用于执行原子操作。这些模板被设计为在原始内存上执行与线程相关的底层操作。原子操作包括:

· 比较和交换(compare-and-swap) · 增加(fetch-and-add) · 减少(fetch-and-sub) · 存储和加载等

#include <atomic>
#include <iostream>

std::atomic<int> counter(0);

void increment() {
    ++counter;
}

int main() {
    increment();
    increment();
    std::cout << counter << '\n';  // 输出: 2
}

在这个例子中,counter是一个原子整数。每次当increment()函数被调用时,计数器都会增加 1,但不用担心线程安全问题。只有一个线程可以增加计数器,其他线程会等待,直到操作完成。最后,主线程打印计数器的值。

注意,在一些情况下,原子操作可能带来一些性能开销,这是因为为了保证操作的原子性,可能需要硬件和/或编译器提供额外的支持。所以,当使用原子操作时,应尽量减少并发冲突,合理使用。

内存顺序

  1. 宽松模型:在此模型下,单个线程内的原子操作都是顺序执行的,不允许指令重排,但不同线程间原子操作的顺序是任意的。类型通过 std::memory_order_relaxed 指定。我们来看一个例子:
std::atomic<int> counter = {0};
std::vector<std::thread> vt;
for (int i = 0; i < 100; ++i) {
    vt.emplace_back([&](){
        counter.fetch_add(1, std::memory_order_relaxed);
    });
}

for (auto& t : vt) {
    t.join();
}
std::cout << "current counter:" << counter << std::endl;
  1. 释放/消费模型: 在C++的原子操作中,我们常常会看到”memory_order”的概念。memory_order用于指定原子操作读取和写入的内存顺序。其中,memory_order_consume是其中一种类型。

memory_order_consume是一种更为松散的内存顺序,它只保证对依赖于原子变量a的普通非原子操作b的顺序:如果b依赖于a,那么在a后面出现的b操作,不会被重排到a之前。也就是说,如果有两个操作A和B,其中B依赖于A的结果,那么B不能在A之前执行。

然而,从C++17起,出于复杂性和可移植性考虑,memory_order_consume已被几乎所有的编译器实现降级为memory_order_acquire,恢复到了严格的内存顺序。

在C++中,不建议直接使用memory_order_consume,推荐使用memory_order_acquirememory_order_release来对原子变量进行更严格的同步。

  1. 释放/获取模型:在此模型下,我们可以进一步加紧对不同线程间原子操作的顺序的限制,在释放 std::memory_order_release和获取std::memory_order_acquire之间规定时序,即发生在释放(release)操作之前的所有写操作,对其他线程的任何获取(acquire)操作都是可见的,亦即发生顺序(happens-before)。

memory_order_acquire是一种防止”读”操作的重排。它常常被用在消费者线程中,用来读取生产者线程中原子变量的最新值,并确保在这之后的所有操作都能看到最新的值。

让我们通过一个简单的例子详细解释下: 假设我们在做一个简单的邮件传递的系统。我们有一个处理数据(”邮件”)的生产者线程和一个消费者线程,生产者在数据完成后触发信号。

std::atomic<bool> data_ready(false); // 原子变量作为触发信号
std::string data; // 数据(本例中的"邮件")

void produce_data()
{
    data = "Hello World!"; // 填写邮件
    data_ready.store(true, std::memory_order_release); // 触发信号,邮件已经准备好
}

void consume_data()
{
    while(!data_ready.load(std::memory_order_acquire)); // 等待邮件

    // 这时,我们确信 data(邮件)现在已经准备好了
    std::cout << "Got mail: " << data << std::endl; 

    //data_ready.store(false, std::memory_order_release); // 重置条件
}

在这个示例中,生产者线程调用produce_data函数,填写数据,并在完成后设置data_ready标识为true。消费者线程在consume_data函数中持续等待,直到data_readytrue。 直到消费者看到data_readytrue,它将开始处理数据,然后可能将信号重置为false以等待下一个信号。 memory_order_acquire 确保了:一旦消费者线程看到 data_readytrue,那么 data 的值必定是 "Hello World!",即使这两个操作在不同的线程中进行。

// a kind of fix-num thread pool
#include <vector>
#include <queue>
#include <thread>
#include <future>
#include <functional>
#include <utility>
#include <iostream>

/*
ThreadPool p(4); // 指定四个工作线程

// 将任务在池中入队,并返回一个 std::future
auto f = pool.enqueue([](int life) {
    return meaning;
}, 42);

// 从 future 中获得执行结果
std::cout << f.get() << std::endl;

*/

class ThreadPool {
public:
    ThreadPool(int num_threads) {
        for (size_t i=0; i<num_threads; ++i) {
            this->threads.emplace_back(
                [this] {
                    while (true) {
                        std::function<void()> task;
                        {
                            std::unique_lock<std::mutex> lock(this->mtx);
                            this->cv.wait(lock, [this]{
                                return this->stop || !this->tasks.empty();
                            });

                            if (this->stop && this->tasks.empty())
                                return;

                            task = std::move(this->tasks.front());
                            this->tasks.pop();
                        }
                        task();
                    }
                }    
            );
        }
    }

    template <class F, class... Args>
    auto enqueue(F&& f, Args&& ...args)
        -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;

        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...));

        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(mtx);

            if (stop)
                throw std::runtime_error("enqueue on stopped ThreadPool");

            tasks.emplace([task]{ (*task)(); });
        }
        cv.notify_one();
        return res;
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(mtx);
            stop = true;
        }

        cv.notify_all();
        for (std::thread &worker: threads)
            worker.join();
    }

private:
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;
    std::mutex mtx;
    std::condition_variable cv;
    bool stop;
};


int main() {
   ThreadPool pool(4); // 创建一个包含四个线程的线程池

// 创建一个任务
auto task = [](int x) {
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时操作
    return x*x;
};

// 将任务加入到线程池,并获取一个 future 来获取结果
std::future<int> result = pool.enqueue(task, 10);

// 获取任务的结果
std::cout << result.get() << std::endl;  // 输出 100
    return 0;
}

更多的练习:

练习1:添加一个将线程池的大小动态调整的函数到上述线程池类。这个函数应该可以增加或减少线程池中线程的数量,但请注意在减少线程数量时,你可能需要思考一下如何处理正在执行任务的线程。 练习2:添加一个取消特定任务的功能到你的线程池类。你可能需要给每个任务一个唯一的ID,然后提供一个使用这个ID来取消任务的功能。 练习3:在你的线程池中实现优先级队列,而不是标准队列,以允许某些任务比其他任务有更高的执行优先级。你可能需要使用一个以priority_queue来替代现有的任务队列。 练习4:添加错误处理机制,确保当一个任务抛出了一个未捕获的异常时,线程池可以正常工作。 练习5:添加对返回结果的支持,如支持任务返回一个自定义的类或者结构。

Reference: 1. (现代C++教程)[https://changkun.de/modern-cpp/zh-cn/07-thread/#7-3-%E6%9C%9F%E7%89%A9]