Linux-生产者消费者模型
@TOC
生产者消费者模型概念
生产者消费者模型又称有限缓冲问题,是一个多进程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。
如下图:
生产者消费者模型特点
生产者消费者是多线程同步于互斥的经典场景,其特点如下:
3种关系:
生产者VS生产者(互斥关系)、生产者VS消费者(同步关系)、消费者VS消费者(互斥关系)2种角色:
生产者和消费者1个交易场所:
通常指内存中的一段缓冲区
我们用代码实现的话就要维护上面的3种关系即可。
生产者和生产者、消费者和消费者为什么要互斥?
当1个生产者开始往缓冲区里写入数据时当然不希望有其他的生产者进入缓冲区,所以要在缓冲区用互斥锁保护起来。同样,1个消费者读取数据也要避免其他的消费者干扰,所以在1个消费者读取缓冲区数据时用互斥锁保护起来。
生产者和消费者同步关系?
当生产者把缓冲区数据加满后,再生产数据就会失败。消费者把缓冲区数据消费完再消费也会失败。
所以让生产者在缓冲区满时休眠,等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。
生产者消费者模型优点
支持并发
由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区作为桥梁连接,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区了拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。
解耦
支持忙时不均
如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。 等生产者的制造速度慢下来,消费者再慢慢处理掉。
基于BlockingQueue的生产者消费者模型
基于阻塞队列的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
和普通队列的区别:
- 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素
- 当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
模拟实现生产者消费者模型
我们先模拟1个单生产者和单消费者
这个BlockQueue我们就用C++STL中的Queue即可。
整体的逻辑:
- 现在实现单生产者和单消费者,那我们维护生产者和消费者的同步关系即可
- 设置BlockQueue的存储数据上限,不能让生产者一直生产下去,具体的实现根据业务的需求
- BlockQueue是临界资源,所以需要1把互斥锁进行保护
- 当生产者向阻塞队列中放入数据时,若此时阻塞队列满了,那生产者此时要
先通知消费者
开始消费数据,自己挂起 - 当消费者取阻塞队列中的数据时,若此时阻塞队列为空了,那消费者
先通知生产者
开始生产数据,自己挂起 - 因此我们还需要2个条件变量,1个当阻塞队列满时,生产者需要在此条件变量下等待。1个条件变量当阻塞队列为空时,消费者在此条件变量下等待
- 无论是生产者还是消费者都要进入临界区判断条件是否满足,若条件不满足就要被挂起,此时它们中的1个是拿着锁在等待的,为了避免死锁的问题,在调用
pthread_cond_wait
时要传入互斥锁,当被挂起时释放手中的锁资源 - 主函数创建2个线程,1个线程不断生产数据,另一个线程不断的读取数据即可
代码如下:
BlockQueue:
1 #include<iostream>
2 #include<pthread.h>
3 #include<queue>
4 using namespace std;
5
6
7 class BlockQueue
8 {
9 private:
10 pthread_mutex_t mutex;
11 pthread_cond_t c_cond; // 消费者在次条件变量下等
12 pthread_cond_t p_cond; // 生产者在此条件变量下等
13 size_t cap;
14 queue<int> q;
15 public:
16 BlockQueue(int _cap = 5):cap(_cap)
17 {}
18 void get_init()
19 {
20 pthread_mutex_init(&mutex,nullptr);
21 pthread_cond_init(&c_cond,nullptr);
22 pthread_cond_init(&p_cond,nullptr);
23 }
24 void LockQueue()
25 {
26 pthread_mutex_lock(&mutex);
27 }
28 void UnLockQueue()
29 {
30 pthread_mutex_unlock(&mutex);
31 }
32 void ProductWait()
33 {
34 pthread_cond_wait(&p_cond,&mutex);
35 }
36 void ConsumerWait()
36 void ConsumerWait()
37 {
38 pthread_cond_wait(&c_cond,&mutex);
39 }
40 void WakeUpConsumer()
41 {
42 pthread_cond_signal(&c_cond);
43 }
44 void WakeUpProduct()
45 {
46 pthread_cond_signal(&p_cond);
47 }
48 bool IsFull()
49 {
50 return cap == q.size();
51 }
52 bool IsEmpty()
53 {
54 return q.empty();
55 }
56 void Push(int &in)
57 {
58 LockQueue();
59 while(IsFull())
60 {
61 //队列满了,先唤醒消费者,生产者挂起
62 WakeUpConsumer();
63 ProductWait();
64 }
65 q.push(in);
66 UnLockQueue();
67 }
68 void Pop(int &out)
69 {
70 LockQueue();
71 while(IsEmpty())
72 {
73 //队列为空,先唤醒生产者,消费者挂起
74 WakeUpProduct();
75 ConsumerWait();
76 }
77 out = q.front();
78 q.pop();
79 UnLockQueue();
80 }
81
82 ~BlockQueue()
83 {
84 pthread_mutex_destroy(&mutex);
85 pthread_cond_destroy(&c_cond);
86 pthread_cond_destroy(&p_cond);
87 }
88 };
mian:
1 #include"BlockQueue.hpp"
2
3
4 void* Consumer(void* arg)
5 {
6 BlockQueue* bq = (BlockQueue*)arg;
7 while(true)
8 {
9 int data;
10 bq->Pop(data);
11 cout<<"Consumer data done:"<<data<<endl;
12 }
13 }
14 void* Product(void* arg)
15 {
16 BlockQueue* bq = (BlockQueue*)arg;
17 while(true)
18 {
19 int data = rand() % 10;
20 bq->Push(data);
21 cout<<"Product data done:"<<data<<endl;
22 }
23 }
24 int main()
25 {
26 BlockQueue* bq=new BlockQueue();
27 bq->get_init();
28 pthread_t c,p;
29 pthread_create(&c,nullptr,Consumer,(void*)bq);
30 pthread_create(&c,nullptr,Product,(void*)bq);
31
32
33 pthread_join(c,nullptr);
34 pthread_join(p,nullptr);
35 delete bq;
36 return 0;
}
先让生产者慢点生产。因为我是让队列满了,生产者唤醒消费者消费,生产者等待。这种情况应该是生产者先执行,生产者满了通知消费者,消费者是一瞬间就消费完了。
跟我们预期的结果一样,生产者生产满了后消费者一瞬间消费完。
让消费者慢,生产者快,一瞬间队列里数据满了,生产者通知消费者消费数据,生产者等待
生产者一下就生产完了,消费者是慢慢的在消费。
计算任务的生产者和消费者模型
上面打印数字是为了好理解现象,我们再封装1个Task的任务类,该类中包含1个run函数,该函数处理计算的任务。
6 class Task
7 {
8 public:
9 int x;
10 int y;
11 public:
12 Task(int _x=0,int _y=0)
13 :x(_x),y(_y)
14 {}
15 int run()
16 {
17 return x + y;
18 }
19
20 };
生产者放入阻塞队列里的就是Task对象,消费者拿出队列中的任务,调用tun函数处理任务
5 void* Consumer(void* arg)
6 {
7 BlockQueue* bq = (BlockQueue*)arg;
8 // pthread_mutex_lock(&c_mutex);
9 while(true)
10 {
11 sleep(1);
12 Task t;
13 bq->Pop(t);
14 cout<<" Consumer :"<<t.x<<"+"<<t.y<<"="<<t.run()<<endl;
//cout<<" Consumer [:"<<pthread_self()<<"]"<<t.x<<"+"<<t.y<<"="<<t.run()<<endl;
15 }
16 // pthread_mutex_unlock(&c_mutex);
17 }
18 void* Product(void* arg)
19 {
20 BlockQueue* bq = (BlockQueue*)arg;
21 //pthread_mutex_lock(&p_mutex);
22 while(true)
23 {
24 //sleep(1);
25 int x= rand() % 10+1;
26 int y= rand() % 100+1;
27 Task t(x,y);
28 bq->Push(t);
29 // cout<<"Product data done:"<<data<<endl;
//cout<<" Product [:"<<pthread_self()<<"]"<<x<<"+" << y<<"=?"<<endl;
30 cout<<" Product :"<<x<<"+" << y<<"=?"<<endl;
31 }
32 // pthread_mutex_unlock(&p_mutex);
33 }
这次可以让阻塞队列中的任务大于队列容量的一半就唤醒消费者自己挂起等待,让消费者消费到队列容量的一半唤醒生产者自己挂起等待。并让生产者快,消费者慢
74 void Push(Task &t)
75 {
76 LockQueue();
77 while(IsFull())
78 {
79 //队列满了,生产者挂起,通知消费者
80 //WakeUpConsumer();
81 ProductWait();
82 }
83 q.push(t);
84 if(q.size() >= _cap / 2)
85 {
86
87 WakeUpConsumer();
88
89 }
90 UnLockQueue();
91 }
92 void Pop(Task &t)
93 {
94 LockQueue();
95 while(IsEmpty())
96 {
97 //队列为空,消费者挂起,通知生产者
98 //WakeUpProduct();
99 ConsumerWait();
100 }
101 t = q.front();
102 q.pop();
103 if(q.size() <= _cap / 2)
104 {
105
106 WakeUpProduct();
107 }
108 UnLockQueue();
109 }
我们可以看到生产者瞬间生产满了,消费者开始消费了3个后,生产者又开始生产了。
多生产者和多消费者
多生产者多消费者在单生产者单消费者的基础上再加2把互斥锁,让生产者自己在组内竞争,竞争赢的先去生产,消费者也是同样的道理,先让消费者自己组内竞争,赢的先去处理任务。BlockQueue和单生产者的一样,我是直接定义了全局的2把锁。当然也可以封装在BlockQueue。
1 #include"BlockQueue.hpp"
2 pthread_mutex_t c_mutex;
3 pthread_mutex_t p_mutex;
4
5 void* Consumer(void* arg)
6 {
7 BlockQueue* bq = (BlockQueue*)arg;
8 //pthread_mutex_lock(&c_mutex);
9 while(true)
10 {
11 pthread_mutex_lock(&c_mutex);
12 Task t;
13 bq->Pop(t);
14 cout<<" Consumer [:"<<pthread_self()<<"]"<<t.x<<"+"<<t.y<<"="<<t.run()<<endl;
15 pthread_mutex_unlock(&c_mutex);
16 sleep(1);
17 }
18 //pthread_mutex_unlock(&c_mutex);
19 }
20 void* Product(void* arg)
21 {
22 BlockQueue* bq = (BlockQueue*)arg;
23 //pthread_mutex_lock(&p_mutex);
24 while(true)
25 {
26 pthread_mutex_lock(&p_mutex);
27 int x= rand() % 10+1;
28 int y= rand() % 100+1;
29 Task t(x,y);
30 bq->Push(t);
31 cout<<" Product [:"<<pthread_self()<<"]"<<x<<"+" << y<<"=?"<<endl;
32 pthread_mutex_unlock(&p_mutex);
33 sleep(1);
34 }
35 //pthread_mutex_unlock(&p_mutex);
36 }
37 int main()
38 {
39 pthread_mutex_init(&c_mutex,nullptr);
40 pthread_mutex_init(&p_mutex,nullptr);
41 BlockQueue* bq=new BlockQueue(15);
42 pthread_t c,p,c2,p2,p3;
43 pthread_create(&c,nullptr,Consumer,(void*)bq);
44 pthread_create(&c2,nullptr,Consumer,(void*)bq);
45 pthread_create(&p,nullptr,Product,(void*)bq);
46 pthread_create(&p2,nullptr,Product,(void*)bq);
47 pthread_create(&p3,nullptr,Product,(void*)bq);
48
49
50 pthread_join(c,nullptr);
51 pthread_join(c2,nullptr);
52 pthread_join(p,nullptr);
53 pthread_join(p2,nullptr);
54 pthread_join(p3,nullptr);
55 pthread_mutex_destroy(&c_mutex);
56 pthread_mutex_destroy(&p_mutex);
57 delete bq;
58 return 0;
59 }
让生产者和消费者一起工作:
有1个消费者的竞争力比较强,生产者刚生产1个就被消费了,其余的都是3个生产者生产,2个消费者处理任务。这就是多生产者和多消费者。
- 点赞
- 收藏
- 关注作者
评论(0)