C++17 STL Cook Book
  • Introduction
  • 前言
  • 关于本书
  • 各章梗概
  • 第1章 C++17的新特性
    • 使用结构化绑定来解包绑定的返回值
    • 将变量作用域限制在if和switch区域内
    • 新的括号初始化规则
    • 构造函数自动推导模板的类型
    • 使用constexpr-if简化编译
    • 只有头文件的库中启用内联变量
    • 使用折叠表达式实现辅助函数
  • 第2章 STL容器
    • 擦除/移除std::vector元素
    • 以O(1)的时间复杂度删除未排序std::vector中的元素
    • 快速或安全的访问std::vector实例的方法
    • 保持对std::vector实例的排序
    • 向std::map实例中高效并有条件的插入元素
    • 了解std::map::insert新的插入提示语义
    • 高效的修改std::map元素的键值
    • std::unordered_map中使用自定义类型
    • 过滤用户的重复输入,并以字母序将重复信息打印出——std::set
    • 实现简单的逆波兰表示法计算器——std::stack
    • 实现词频计数器——std::map
    • 实现写作风格助手用来查找文本中很长的句子——std::multimap
    • 实现个人待办事项列表——std::priority_queue
  • 第3章 迭代器
    • 建立可迭代区域
    • 让自己的迭代器与STL的迭代器兼容
    • 使用迭代适配器填充通用数据结构
    • 使用迭代器实现算法
    • 使用反向迭代适配器进行迭代
    • 使用哨兵终止迭代
    • 使用检查过的迭代器自动化检查迭代器代码
    • 构建zip迭代适配器
  • 第4章 Lambda表达式
    • 使用Lambda表达式定义函数
    • 使用Lambda为std::function添加多态性
    • 并置函数
    • 通过逻辑连接创建复杂谓词
    • 使用同一输入调用多个函数
    • 使用std::accumulate和Lambda函数实现transform_if
    • 编译时生成笛卡尔乘积
  • 第5章 STL基础算法
    • 容器间相互复制元素
    • 容器元素排序
    • 从容器中删除指定元素
    • 改变容器内容
    • 在有序和无序的vector中查找元素
    • 将vector中的值控制在特定数值范围内——std::clamp
    • 在字符串中定位模式并选择最佳实现——std::search
    • 对大vector进行采样
    • 生成输入序列的序列
    • 实现字典合并工具
  • 第6章 STL算法的高级使用方式
    • 使用STL算法实现单词查找树类
    • 使用树实现搜索输入建议生成器
    • 使用STL数值算法实现傅里叶变换
    • 计算两个vector的误差和
    • 使用ASCII字符曼德尔布罗特集合
    • 实现分割算法
    • 将标准算法进行组合
    • 删除词组间连续的空格
    • 压缩和解压缩字符串
  • 第7章 字符串, 流和正则表达
    • 创建、连接和转换字符串
    • 消除字符串开始和结束处的空格
    • 无需构造获取std::string
    • 从用户的输入读取数值
    • 计算文件中的单词数量
    • 格式化输出
    • 使用输入文件初始化复杂对象
    • 迭代器填充容器——std::istream
    • 迭代器进行打印——std::ostream
    • 使用特定代码段将输出重定向到文件
    • 通过集成std::char_traits创建自定义字符串类
    • 使用正则表达式库标记输入
    • 简单打印不同格式的数字
    • 从std::iostream错误中获取可读异常
  • 第8章 工具类
    • 转换不同的时间单位——std::ratio
    • 转换绝对时间和相对时间——std::chrono
    • 安全的标识失败——std::optional
    • 对元组使用函数
    • 使用元组快速构成数据结构
    • 将void*替换为更为安全的std::any
    • 存储不同的类型——std::variant
    • 自动化管理资源——std::unique_ptr
    • 处理共享堆内存——std::shared_ptr
    • 对共享对象使用弱指针
    • 使用智能指针简化处理遗留API
    • 共享同一对象的不同成员
    • 选择合适的引擎生成随机数
    • 让STL以指定分布方式产生随机数
  • 第9章 并行和并发
    • 标准算法的自动并行
    • 让程序在特定时间休眠
    • 启动和停止线程
    • 打造异常安全的共享锁——std::unique_lock和std::shared_lock
    • 避免死锁——std::scoped_lock
    • 同步并行中使用std::cout
    • 进行延迟初始化——std::call_once
    • 将执行的程序推到后台——std::async
    • 实现生产者/消费者模型——std::condition_variable
    • 实现多生产者/多消费者模型——std::condition_variable
    • 并行ASCII曼德尔布罗特渲染器——std::async
    • 实现一个小型自动化并行库——std::future
  • 第10章 文件系统
    • 实现标准化路径
    • 使用相对路径获取规范的文件路径
    • 列出目录下的所有文件
    • 实现一个类似grep的文本搜索工具
    • 实现一个自动文件重命名器
    • 实现一个磁盘使用统计器
    • 计算文件类型的统计信息
    • 实现一个工具:通过符号链接减少重复文件,从而控制文件夹大小
Powered by GitBook
On this page
  • How to do it...
  • How it works...

Was this helpful?

  1. 第9章 并行和并发

实现多生产者/多消费者模型——std::condition_variable

让我们再来回顾一下生产者/消费者问题,这要比上一节的问题更加复杂。我们创建了多个生成这和多个消费者。并且,我们定义的队列没有限制上限。

当队列中没有商品时,消费者会处于等待状态,而当队列中没有空间可以盛放商品时,生产者也会处于等待状态。

我们将会使用多个std::condition_variable对象来解决这个问题,不过使用的方式与上节有些不同。

How to do it...

本节中,我们将实现一个类似于上节的程序,这次我们有多个生产者和消费者:

  1. 包含必要的头文件,并声明所使用的命名空间:

    #include <iostream>
    #include <iomanip>
    #include <sstream>
    #include <vector>
    #include <queue>
    #include <thread>
    #include <mutex>
    #include <condition_variable>
    #include <chrono>
    
    using namespace std;
    using namespace chrono_literals;
  2. 接下来从本章的其他小节中拿过来一个同步打印的辅助类型,因为其能帮助我们在大量并发时进行打印:

    struct pcout : public stringstream {
        static inline mutex cout_mutex;
        ~pcout() {
            lock_guard<mutex> l {cout_mutex};
            cout << rdbuf();
        }
    };
  3. 所有生产者都会将值写入到同一个队列中,并且所有消费者也会从这个队列中获取值。对于这个队列,我们需要使用一个互斥量对队列进行保护,并设置一个标识,其会告诉我们生产者是否已经停止生产:

    queue<size_t> q;
    mutex q_mutex;
    bool production_stopped {false};
  4. 我们将在程序中使用两个condition_variable对象。单生产者/消费者的情况下,只需要一个condition_variable告诉我们队列上面摆放了新商品。本节的例子中,我们将来处理更加复杂的情况。我们需要生产者持续生产,以保证队列上一直有可消费的商品存在。如果商品囤积到一定程度,则生产者休息。go_consume变量就用来提醒消费者消费的,而go_produce则是用来提醒生产者进行生产的:

    condition_variable go_produce;
    condition_variable go_consume;
  5. 生产者函数能够接受一个生产者ID,所要生产的商品数量,以及囤积商品值的上限。然后,生产者就会进入循环生产阶段。这里,首先其会对队列的互斥量进行上锁,然后在通过调用go_produce.wait对互斥量进行解锁。队列上的商品数量未达到囤积阈值时,满足等待条件:

    static void producer(size_t id, size_t items, size_t stock)
    {
        for (size_t i = 0; i < items; ++i) {
            unique_lock<mutex> lock(q_mutex);
            go_produce.wait(lock,
                [&] { return q.size() < stock; });
  6. 生产者开始生产后,就会有商品放入队列中。队列商品的表达式为id * 100 + i。因为百位和线程id强相关,这样我们就能了解到哪些商品是哪些生产者生产的。我们也能将生产事件打印到终端上。格式看起来可能有些奇怪,不过其会与消费者打印输出对齐:

            q.push(id * 100 + i);
    
            pcout{} << " Producer " << id << " --> item "
                    << setw(3) << q.back() << '\n';
  7. 生产商品之后,我们叫醒沉睡中的消费者。每个睡眠周期为90毫秒,这用来模拟生产者生产商品的时间:

            go_consume.notify_all();
            this_thread::sleep_for(90ms);
        }
    
        pcout{} << "EXIT: Producer " << id << '\n';
    }
  8. 现在来实现消费者函数,其只接受一个消费者ID作为参数。当生产者停止生产,或是队列上没有商品,消费者就会继续等待。队列上没有商品时,生产者还在生产的话,那么可以肯定的是,队列上肯定会有新商品的产生:

    static void consumer(size_t id)
    {
        while (!production_stopped || !q.empty()) {
            unique_lock<mutex> lock(q_mutex);
  9. 对队列的互斥量上锁之后,我们将会在等待go_consume事件变量时对互斥量进行解锁。Lambda表达式表明,当队列中有商品的时候我们就会对其进行获取。第二个参数1s说明,我们并不想等太久。如果等待时间超过1秒,我们将不会等待。当谓词条件达成,则wait_for函数返回;否则就会因为超时而放弃等待。如果队列中有新商品,我们将会获取这个商品,并进行相应的打印:

            if (go_consume.wait_for(lock, 1s,
                    [] { return !q.empty(); })) {
                pcout{} << " item "
                        << setw(3) << q.front()
                        << " --> Consumer "
                        << id << '\n';
                q.pop();
  10. 商品被消费之后,我们将会提醒生产者,并进入130毫秒的休眠状态,这个时间用来模拟消费时间:

                go_produce.notify_all();
                this_thread::sleep_for(130ms);
            }
        }
    
        pcout{} << "EXIT: Producer " << id << '\n';
    }
  11. 主函数中,我们对工作线程和消费线程各自创建一个vector:

    int main()
    {
        vector<thread> workers;
        vector<thread> consumers;
  12. 然后,我们创建3个生产者和5个消费者:

        for (size_t i = 0; i < 3; ++i) {
            workers.emplace_back(producer, i, 15, 5);
        }
    
        for (size_t i = 0; i < 5; ++i) {
            consumers.emplace_back(consumer, i);
        }
  13. 我们会先让生产者线程终止。然后返回,并对production_stopped标识进行设置,这将会让消费者线程同时停止。然后,我们要将这些线程进行回收,然后退出程序:

        for (auto &t : workers) { t.join(); }
        production_stopped = true;
        for (auto &t : consumers) { t.join(); }
    }
  14. 编译并运行程序,我们将获得如下的输出。输出特别长,我们进行了截断。我们能看到生产者偶尔会休息一下,并且消费者会消费掉对应的商品,直到再次生产。若是将生产者/消费者的休眠时间进行修改,则会得到完全不一样的结果:

    $ ./multi_producer_consumer
    Producer 0 --> item 0
    Producer 1 --> item 100
    item 0 --> Consumer 0
    Producer 2 --> item 200
    item 100 --> Consumer 1
    item 200 --> Consumer 2
    Producer 0 --> item 1
    Producer 1 --> item 101
    item 1 --> Consumer 0
    ...
    Producer 0 --> item14
    EXIT: Producer 0
    Producer 1 --> item 114
    EXIT: Producer 1
    item14 --> Consumer 0
    Producer 2 --> item 214
    EXIT: Producer 2
    item 114 --> Consumer 1
    item 214 --> Consumer 2
    EXIT: Consumer 2
    EXIT: Consumer 3
    EXIT: Consumer 4
    EXIT: Consumer 0
    EXIT: Consumer 1

How it works...

这节可以作为之前章节的扩展。与单生产者和消费者不同,我们实现了M个生产者和N个消费者之间的同步。因此,程序中不是消费者因为队列中没有商品而等待,就是因为队列中囤积了太多商品让生产者等待。

当有多个消费者等待同一个队列中出现新的商品时,程序的模式就又和单生产者/消费者工作的模式相同了。当有一个线程对保护队列的互斥量上锁时,然后对货物进行添加或减少,这样代码就是安全的。这样的话,无论有多少线程在同时等待这个锁,对于我们来说都无所谓。生产者也同理,其中最重要的就是,队列不允许两个及两个以上的线程进行访问。

比单生产者/消费者原理复杂的原因在于,当商品的数量在队列中囤积到一定程度,我们将会让生产者线程停止。为了迎合这个需求,我们使用两个不同的condition_variable:

  1. go_produce表示队列没有被填满,并且生产者会继续生产,而后将商品放置在队列中。

  2. go_consume表示队列已经填满,消费者可以继续消费。

这样,生产者会将队列用货物填满,并且go_consume会用如下代码,提醒消费者线程:

if (go_consume.wait_for(lock, 1s, [] { return !q.empty(); })) {
    // got the event without timeout
}

生产者也会进行等待,直到可以再次生产:

go_produce.wait(lock, [&] { return q.size() < stock; });

还有一个细节就是我们不会让消费者线程等太久。在对go_consume.wait_for的调用中,我们添加了超时参数,并且设置为1秒。这对于消费者来说是一种退出机制:当队列为空的状态持续多于1秒,那么就可能没有生产者在工作。

这个处理起来很简单,代码会尽可能让队列中商品的数量达到阈值的上限。更复杂的程序中,当商品的数量为阈值上限的一半时,消费者线程会对生产者线程进行提醒。这样生产者就会在队列为空前继续生产。

condition_variable帮助我们完美的解决了一个问题:当一个消费者触发了go_produce的提醒,那么将会有很多生产者竞争的去生产下一个商品。如果只需要生产一个商品,那么只需要一个生产者就好。当go_produce被触发时,所有生产者都争相生产这一个商品,我们将会看到的情况就是商品在队列中的数量超过了阈值的上限。

我们试想一下这种情况,我们有(max - 1)个商品在队列中,并且想在要一个商品将队列填满。不论是一个消费者线程调用了go_produce.notify_one()(只叫醒一个等待线程)或go_produce.notify_all()(叫醒所有等待的线程),都需要保证只有一个生产者线程调用了go_produce.wait,因为对于其他生成线程来说,一旦互斥锁解锁,那么q.size() < stock(stock货物阈值上限)的条件将不复存在。

Previous实现生产者/消费者模型——std::condition_variableNext并行ASCII曼德尔布罗特渲染器——std::async

Last updated 6 years ago

Was this helpful?