C++并发编程 - 同步并发操作

举报
C语言与CPP编程 发表于 2022/08/31 00:32:54 2022/08/31
【摘要】 C++并发编程 - 同步并发操作 前言  一个好的程序员是那种过单行线马路都要往两边看的人。   线程同步的本质是防止临界区(公共资源)并发操作,即多个线程禁止同时操作临界区。为此,在程序中以某种手段,将多个线程按照先后顺序访问临界区。   临界区的操作一直要保持谨慎。多线程访问临界区,同时读还好;假如一个写的同时...

C++并发编程 - 同步并发操作

前言

 一个好的程序员是那种过单行线马路都要往两边看的人。

  线程同步的本质是防止临界区(公共资源)并发操作,即多个线程禁止同时操作临界区。为此,在程序中以某种手段,将多个线程按照先后顺序访问临界区。

  临界区的操作一直要保持谨慎。多线程访问临界区,同时读还好;假如一个写的同时,另一个读,那这个读的值可能不是确定的,有可能是写之前的也有可能是写之后的,这可能会引发重大bug,且难以排查。

  本文主要介绍如何使用c++11中条件变量以及期望来设计并发操作。

条件变量 std::condition_variable

  在多线程任务中,线程通常使用条件变量阻塞自身,直至条件发生。即A线程阻塞等待某个条件变量,B线程通知条件变量变化解除A线程阻塞。为了防止竞争,条件变量的使用总是和互斥锁结合在一起。

  C++标准库对条件变量有两套实现: std::condition_variablestd::condition_variable_any。其中前者仅限于与std:mutex配合使用,后者可以和任何满足最低标准的互斥量一起使用。

  从体积、性能以及系统资源的使用方面,推荐使用 std::condition_variable。仅当对灵活性有硬性要求的情况下,才会选择 std::condition_variable_any

std::condition_variable 实例:


   
  1. static std::mutex mut;
  2. static bool cond_value = false;
  3. static std::condition_variable cond;
  4. // 阻塞线程
  5. void wait_thread()
  6. {
  7.     std::unique_lock<std::mutex> lk(mut); //a
  8.     cond.wait(
  9.         lk, []{ return cond_value; }); 
  10.     //lk.unlock();    // b
  11.     //...  //c
  12. }
  13. // 通知线程
  14. void notify()
  15. {
  16.     std::lock_guard<std::mutex> lk(mut); 
  17.     cond_value = true;
  18.     cond.notify_one();
  19. }
  • 流程:
      step 1. wait_thread先获取锁,然后查询cond_value为false时,先解锁,然后阻塞线程,等待其他线程通知。
      step2. notify()获取锁,更改cond_value为true,并通过notify_one()通知阻塞的线程,并解锁。
      step3. wait_thread接收到通知,重新获取锁,检查cond_value为true,从wait()返回(解除阻塞)。返回时仍处于持有锁状态,直至互斥锁被析构或者手动解锁。

  在多线程中持有锁时间过长是一件糟糕的事情,当处理完与互斥锁相关的共享数据时,就应该立刻解锁。故上述c处还有其他业务时,b处有必要解锁。

  另外,还存在多个线程等待同一事件。此种场景,可通过notify_all()通知所有阻塞的线程检查条件。

「注」wait()会去检查这些条件(通过调用所提供的lambda函数), 当条件满足(lambda函数返回true)时返回。如果条件不满足(lambda函数返回false), wait()函数将解锁互斥量, 并且将这个线程(上段提到的处理数据的线程)置于阻塞或等待状态。当准备数据的线程调用notify_one()通知条件变量时, 处理数据的线程从睡眠状态中苏醒, 重新获取互斥锁, 并且对条件再次检查,在条件满足的情况下, 从wait()返回并继续持有锁。当条件不满足时, 线程将对互斥量解锁,并且重新开始等待。

期望 std::future

  是指某个线程只等待一个特定的一次性事件。C++标准库将这种一次性事件称为“期望” (future)。

  当一个线程需要等待一个特定的一次性事件时,在某种程度上来说它就需要知道这个事件在未来的表现形式。之后,这个线程会周期性(较短的周期)的等待或检查,事件是否触发(检查信息板);在检查期间也会执行其他任务。另外,在等待任务期间它可以先执行另外一些任务,直到对应的任务触发,而后等待期望的状态会变为“就绪”(ready)。

  在C++标准库中, 有两种“期望”, 使用两种类型模板实现, 声明在头文件中: 唯一期望(uniquefutures)( std::future<> )和共享期望(shared futures)( std::shared_future<> )。这是仿照 std::unique_ptr 和 std::shared_ptr 。std::future 的实例只能与一个指定事件相关联,而 std::shared_future 的实例就能关联多个事件。后者的实现中, 所有实例会在同时变为就绪状态, 并且他们可以访问与事件相关的任何数据。

「注」: 以上两段描述,摘抄于《C++并发编程实战》

std::future并非单独使用,在C++标准库std::asyncstd::packaged_taskstd::promise关联了std::future。即std::asyncstd::packaged_taskstd::promise会返回std::future类型,线程通过std::future获取任务执行的结果。

  了解这些,我们可以使用std::future程序上实现业务与任务的分离。即业务线程只负责处理逻辑,任务线程负责任务执行,业务线程又能获取到任务执行的结果或其他的设计。

std::async 带返回值的后台任务

  当需要执行一个耗时的任务,在不阻塞主线程的条件下,还需要关心这个任务执行的结果时(例如是获取计算结果)。在不考虑使用全局变量情况下(全局变量,代码大忌),std::thread就不能满足这个需求了,此时std::async可以满足需求。

std::async使用案例

std::async可启动一个异步任务。与std::thread对象等待运行方式的不同,std::async会返回一个std::future对象, 这个对象持有最终计算出来的结果。当需要这个结果时,只需要调用这个对象的get()成员函数;并且直到“期望”状态为就绪的情况下,线程才会阻塞;之后,返回计算结果。

「std::async 实例」:


   
  1. void async_test()
  2. {
  3.     struct SThreadParam
  4.     {
  5.         int func(int val1, int val2)
  6.         {
  7.             LOGD("async func start...\n");
  8.             sleep(2);           // Simulated 2S time consumption
  9.             return val1 + val2;
  10.         }
  11.     };
  12.     SThreadParam paramFunc;
  13.     LOGD("create async...\n");
  14.     std::future<int> f4 = std::async(std::launch::async, &SThreadParam::func, &paramFunc, 10, 12);
  15.     LOGD("async debug...\n");
  16.     //sleep(2);
  17.     LOGD("async return %d.\n", f4.get());
  18. }

「执行结果」


   
  1. 2022-08-27 00:46:44.951  179 D: create async...
  2. 2022-08-27 00:46:44.951  181 D: async debug...
  3. 2022-08-27 00:46:44.951  173 D: async func start...
  4. 2022-08-27 00:46:46.953  183 D: async return 22.

  一般情况下,std::async()任务函数默认情况下在创建时启动,在调用f4.get()时,若线程未执行完,则阻塞至执行完;否则,立刻返回结果(主线程sleep(2)是否注释,后两条打印都相差2s)。当然也有不一般的情况,见下文「注」

指定async任务执行时机

std::async()任务执行时机可在创建时传入额外参数指定,这个参数的类型是std::launch,有以下含义:

  • std::launch::deferred: 函数调用被延迟到wait()或get()函数调用时才执行

  • std::launch::async: 函数必须在其 所在的独立线程上执行

  • std::launch::deferred | std::launch::async: 表明实现可以选择这两种方式的一种


   
  1. auto f6=std::async(std::launch::async,Y(),1.2); // 在新线程上执行
  2. auto f7=std::async(std::launch::deferred, funcstd::ref(x)); // 在wait()或get()调用时执行
  3. auto f8=std::async(
  4.         std::launch::deferred | std::launch::async,
  5.         funcstd::ref(x)); // 实现选择执行方式
  6. auto f9=std::async(funcstd::ref(x));
  7. f7.wait(); // 调用延迟函数

「注」
std::asyncstd::thread作用相似,两者最明显的区别在于async采用默认启动策略时并不一定创建新的线程。
  如果系统资源紧张,std::thread创建线程可能失败,系统报告异常。而std::async则不会,它在无法创建新线程时,会将任务分配给后续调用future.get()函数的线程,并以同步的方式执行(即不创建新线程)。

std::async避坑

接下说的很重要!!!
  在std::async时,一定要处理其返回的std::future。否则,async会以同步的方式执行任务,即当前线程会阻塞到任务执行完。

「错误的示例」


   
  1. void async_case()
  2. {
  3.     struct SThreadParam
  4.     {
  5.         int func(int val1, int val2)
  6.         {
  7.             LOGD("async func start...\n"); // a
  8.             sleep(2);           // Simulated 2S time consumption
  9.             return val1 + val2;
  10.         }
  11.     };
  12.     SThreadParam paramFunc;
  13.     LOGD("create async...\n"); // b
  14.     //std::future<int> f4 = std::async(&SThreadParam::func, &paramFunc, 10, 12);
  15.     std::async(std::launch::async, &SThreadParam::func, &paramFunc, 10, 12);
  16.     LOGD("async debug...\n"); // c
  17.     //sleep(2);
  18.     //LOGD("async return %d.\n", f4.get());
  19. }

「执行结果」


   
  1. 2022-08-27 00:55:15.886  179 D: create async...
  2. 2022-08-27 00:55:15.886  173 D: async func start...
  3. 2022-08-27 00:55:17.887  182 D: async debug...

  设计上我们希望async不会阻塞,即SThreadParam::func应该在另一个线程执行,b和c的打印时间应该接近相同。但实际上却相差2s,刚好是SThreadParam::func的执行时间。

  这是很隐蔽且很重大的bug,假设任务时间更长,会导致主线长时间阻塞。这个bug网上大佬们已经给出很多解释了,这里不再赘诉,可参考https://blog.csdn.net/weixin_44537992/article/details/108438379。

  这就警告我们在使用新的接口时,还是要搞清楚其原型及原理,规范地使用未知的接口。说到这里我也慢慢懂了,为什么有些接口明明部分参数可以缺省,但是大佬们在使用时还是会明确指定这些参数值,虽然使用起来更复杂,但是也更安全。

std::packaged_task 任务与期望

std::packaged_task<>对一个函数或可调用对象, 绑定一个期望。当std::packaged_task<>对象被调用,它就会调用相关函数或可调用对象,将期望状态置为就绪,返回值也会被存储为相关数据。

  与std::async不同的是,std::packaged_task<>可以将任务与期望打包,移动到指定线程显示调用packaged_task,future才会就绪;而std::async()不能指定线程运行任务。这是两者最显著的差异。

「packaged_task实例」


   
  1. void task_execute_thread()
  2. {
  3.     std::thread th5([](const char *name) {
  4.         static std::packaged_task<int(int)> task;
  5.         
  6.         while(!packaged_shutdown) {
  7.             std::unique_lock<std::mutex> lk(package_mut);
  8.             LOG("%s: wait task...\n", name);
  9.             pacakge_cond.wait(lk, [](){
  10.                 LOG("check queue %ld.\n", task_queue.size());
  11.                 return !task_queue.empty();
  12.             });
  13.             lk.unlock();
  14.             LOG("%s: task execute.\n", name);
  15.             task = std::move(task_queue.front()); // 5
  16.             task_queue.pop_front();
  17.             task(2); // a. 执行任务,参数未使用
  18.         }
  19.     }, "thread5");
  20.     th5.detach();
  21. }
  22. void task_post()
  23. {
  24.     static int i = 0;
  25.     auto f = [](int num){
  26.         return num;
  27.     };
  28.     std::packaged_task<int(int)> task(std::bind(f, ++i));
  29.     std::future<int> fut = task.get_future();
  30.     std::unique_lock<std::mutex> lk(package_mut);
  31.     
  32.     task_queue.push_back(std::move(task));
  33.     lk.unlock();
  34.     pacakge_cond.notify_one();
  35.     LOG("task return %d.\n", fut.get()); // b. 阻塞至a执行完成
  36. }

「执行结果」


   
  1. input: f
  2. thread5: wait task...
  3. check queue 0.
  4. Input: g
  5. check queue 1.
  6. thread5: task execute 0.
  7. thread5: wait task...
  8. check queue 0.
  9. task return 1.

「注」 std::future只适用于一次期望。即获取的future只允许使用一次get。std::shared_future似乎可以解决这个问题,未确认。

std::promises 承诺与期望

std::promise对象可以保存某一类型 T 的值,该值可被 future 对象读取(可能在另外一个线程中),因此 promise 也提供了一种线程同步的手段。在 promise 对象构造时可以和一个共享状态(通常是std::future)相关联,并可以在相关联的共享状态(std::future)上保存一个类型为 T 的值。

  与std::packaged_task差异在于,std::promise可在线程运行时,通过set_value()向阻塞线程传递值。即A线程通过std::promise::set_value()传递某类型T的值,B线程std::future::get()会解除阻塞,同时获取到A线程传的T类型的值。

「std::promise实例」


   
  1. void pomise_case()
  2. {
  3.     std::promise<int> promise;  // save type is int
  4.     std::future<int> future = promise.get_future(); // from promise
  5.     LOGD("-> create thread6.\n");
  6.     std::thread th6([](const char *name, std::promise<int> &prm) {
  7.         static int value = 1024;
  8.         std::this_thread::sleep_for(std::chrono::seconds(2)); // To highlight the delay of the effect
  9.         prm.set_value(++value);  // a. refer step b
  10.         LOGD("%s: set value: %d.\n", name, value);
  11.     }, "thread6", std::ref(promise));
  12.     
  13.     LOGD("-> create thread7.\n"); 
  14.     std::thread th7([](const char *name, std::future<int> &fut) {
  15.         auto value = fut.get(); // b. wait step a set_value
  16.         LOGD("%s: get value: %d.\n", name, value);
  17.     }, "thread7", std::ref(future));
  18.     th6.join();
  19.     th7.join();
  20. }

「执行效果」


   
  1. 2022-08-27 02:24:08.826  233 D: -> create thread6.
  2. 2022-08-27 02:24:08.826  242 D: -> create thread7.
  3. 2022-08-27 02:24:10.827  239 D: thread6: set value: 1025.
  4. 2022-08-27 02:24:10.827  246 D: thread7: get value: 1025.

std::promise还是挺有意思的,可以实现线程间的值传递且无并发问题。原来我们通过全局变量实现线程间通信的方法,还要考虑上锁,以后就可以通过promise来实现了。

总结

  • 本文主要总结了同步并发常用的一些技巧,其中条件变量使用被广泛使用,而c++11新引进的std::future相对来说很少被用到。

  • 大致概括一下std::asyncstd::packaged_taskstd::promise使用的场景:
    std::async: 另起线程异步处理耗时的任务,并返回处理结果。std::packaged_task: 将任务函数打包丢到指定线程处理,并返回处理结果。std::promise: 实现两个线程间的数据传递。

  • 刚开始看std::asyncstd::packaged_taskstd::promise,多少有一点抵触。一个同步搞这么多接口,仔细慢看发现每一种都使用都是针对不同的场景。

  • std::async在使用时一定主要处理其返回的std::future,否则会引发隐蔽且严重的bug。详见「std::async避坑章节」

  • 针对std::async可能引发的问题,告诫程序员们,一定要规范的使用每一个接口,缺省的参数及返回值不处理的话可能让代码裂成“东非大裂谷”。对于使用到的接口的参数和返回值,不管有没有必要,都要处理。

  • 同步并发问题一直是程序中隐蔽而又严重的bug,不到万不得已尽量不要使用。同步并发bug就像是程序里的"蒙娜丽莎",它在对你笑,你却一脸懵逼。

最后

用心感悟,认真记录,写好每一篇文章,分享每一框干货。

93e4620e1c4b8b371f6339f550bb1b31.gif

5bf38a4e3f780568fcad1ac385674c0f.png

d5fdeb7f4a59e4065c50a39b37e75833.gif

49edd3a040f6e28fbc7ce14bb7e6fadf.png

一键四连

c9fe6648494f6564ee70b336a658af90.gif

df5ab3800ebec2c0a81db85e0ae016dc.png

ee98a201d84ac4e8217b89524737f2cf.gif

e5ce493a5c6f96ac06dddb515c7040c4.png

文章来源: blog.csdn.net,作者:程序员编程指南,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/weixin_41055260/article/details/126595635

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。