Linux多线程(线程同步与条件变量)
一、线程同步的必要性
在线程的互斥中,我们解决了临界资源的多线程访问问题,引入了线程锁的概念,它使得每个线程访问临界资源的时候具有原子性。
我们还需要考虑一种情况,当一个线程访问了临界资源后,释放了它的锁,同时立刻参与到了锁的竞争中,如果它又拿到了锁。那么其他线程就会由于长时间得不到锁访问不了临界资源而造成线程饥饿问题。
线程同步的目的就在于让线程有序地访问临界资源。
二、条件变量
1.四个函数
我们通过条件变量来实现线程的同步。我们希望让线程有序地访问临界资源,就必须知道临界资源的状态,在只有锁的情况下,我们其实是不知道临界资源处于何种状态的。(是否正在被某个线程访问)。因此我们引入了条件变量。
我们需要了解两个函数和一个类型。
(1)pthread_cond_wait
pthread_cond_wait是线程等待函数,它的第一个参数是一个条件变量类型,类型为pthread_cond_t。它的第二个参数是一个锁的类型。当调用该函数时,线程会自动释放锁,并将自己挂起等待。返回的时候会首先自动竞争锁,竞争到锁后再返回。
(2)pthread_cond_signal
它的参数是一个条件变量,用于唤醒线程。
(3)pthread_cond_init与pthread_cond_destroy
这两个函数是初始化和销毁条件变量的函数。与锁的初始化和销毁使用方法是一样的,它们的参数都是条件变量。属性参数设置为空即可。
2.控制线程
下面我们使用这两个函数来实现一个用线程控制线程的程序:
#include<iostream>
#include<unistd.h>
#include<pthread.h>
#include<string.h>
using namespace std;
#define NUM 5
pthread_mutex_t mtx;
pthread_cond_t cond;
void* ctrl(void* args)
{
string name=(char*)args;
while(true)
{
cout<<"master say:begin work"<<endl;
pthread_cond_signal(&cond);//唤醒一个在该条件变量下等待的线程,下次唤醒的不是该线程
sleep(2);
}
}
void* work(void* args)
{
int number=*(int*)args;
delete (int*)args;
while(true)
{
pthread_cond_wait(&cond,&mtx);
cout<<"worker:"<<number<<" is working"<<endl;
}
}
int main()
{
pthread_mutex_init(&mtx,nullptr);
pthread_cond_init(&cond,nullptr);
pthread_t worker[5];//定义5个新线程
pthread_t master;//定义主线程
pthread_create(&master,nullptr,ctrl,(void*)"master");
for(int i=0;i<NUM;i++)
{
int* id=new int(i);
pthread_create(worker+i,nullptr,work,(void*)id);
}
for(int i=0;i<NUM;i++)
{
pthread_join(worker[i],nullptr);
}
pthread_join(master,nullptr);
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
return 0;
}
此时五个线程都在条件变量cond处等待,当maste线程调用pthread_cond_signal信号,唤醒在cond处挂起等待的一个线程的时候,该线程开始执行,由于是while循环,因此再挂起等待。而master线程再唤醒另一个线程。
可以看到线程时按一定的顺序来执行的,因此条件变量的内部一定有一个等待队列。当信号到来时,唤醒队首的线程,当该线程再次执行在cond环境变量处等待时,将其放在队尾。此时就实现了按一定顺序唤醒线程。
pthread_cond_signal一次可以唤醒一个线程。
而pthread_cond_broadcast一次可以唤醒所有在该条件变量等待的线程。
三、条件变量与锁的配合使用
1.生产者消费者模型
我们使用日常生活中的例子来理解这个模型:
超市存在的目的是什么呢?是为了收集需求,以及减少交易的成本。
其中超市本身就是一个临界资源,我们将每个消费者和生产者看成一个个线程。
供货商和供货商之间的关系:竞争(互斥)
消费者和消费者之间的关系:竞争(互斥)
供货商和消费者之间的关系:竞争(互斥,竞争货架等资源),同步(按一定顺序)
其中消费者和生产者是两种执行流。
超市是一个临界资源,它可能是一段缓冲区(内存空间,STL容器)
因此我们可以根据这三种条件来判断是否是生产者和消费者模型,即三种关系,两个执行流和一段缓冲区,简称“321原则”。
2.基于阻塞队列的生产者消费者模型
设计一个程序使两个线程互相控制,从而实现生产者消费者模型:
(1)阻塞队列
#include<iostream>
#include<unistd.h>
#include<string.h>
#include<stdlib.h>
#include<queue>
using namespace std;
namespace ns_blockqueue
{
template<class T>
class BlockQueue
{
private:
queue<T> bq_;//阻塞队列
int cap_;//队列的元素上限
pthread_mutex_t mtx_;
pthread_cond_t full_;
pthread_cond_t empty_;
public:
BlockQueue(int cap=5):cap_(cap)
{
pthread_mutex_init(&mtx_,nullptr);
pthread_cond_init(&full_,nullptr);
pthread_cond_init(&empty_,nullptr);
}
void LockQueue()
{
pthread_mutex_lock(&mtx_);//对队列加锁
}
void unLockQueue()
{
pthread_mutex_unlock(&mtx_);//对队列解锁
}
void ProductorWait()
{
pthread_cond_wait(&empty_,&mtx_);//在空时等待
}
void ConsumerWait()
{
pthread_cond_wait(&full_,&mtx_);//在满时等待
}
void WakeupConsumer()
{
pthread_cond_signal(&full_);//对满发信号
}
void WakeupProductor()
{
pthread_cond_signal(&empty_);//对空发信号
}
bool isEmpty()
{
return bq_.size();
}
bool isFull()
{
return (bq_.size()==cap_);
}
void Push(const T& in)
{
LockQueue();
while(isFull())
{
ProductorWait();
}
bq_.push(in);
unLockQueue();
WakeupConsumer();
}
void Pop(T* out)
{
LockQueue();
while(!isEmpty())
{
ConsumerWait();
}
*out=bq_.front();
bq_.pop();
unLockQueue();
WakeupProductor();
}
~BlockQueue()
{
pthread_mutex_destroy(&mtx_);
pthread_cond_destroy(&full_);
pthread_cond_destroy(&empty_);
}
};
}
生产者和消费者两个线程并行执行,当消费者要消费数据的时候要访问临界资源,需要先加锁,在访问临界资源时,发现目前还没有数据,此时在full的条件下等待。并释放锁(pthread_cond_wait会主动释放锁),此时生产者进行生产,当生产者生产完毕之后唤醒在full条件下等待的消费者。
此时消费者与生产者继续竞争锁,当消费者竞争到锁时,它执行pthread_cond_wait之后的代码。此时消费者和生产者并行执行。
当阻塞队列已经满了的时候(生产者竞争锁能力太强),此时生产者挂起等待,消费者消费一个数据之后还会唤醒生产者。从而保证一直运行下去。
(2)Task
#include<iostream>
using namespace std;
namespace ns_task
{
class Task
{
private:
int x_;
int y_;
char op_;
public:
Task(int x,int y,char op):x_(x),y_(y),op_(op)
{};
Task()
{};
int Run()
{
int res=0;
switch(op_)
{
case '+':
res=x_+y_;
break;
case '-':
res=x_-y_;
break;
case '*':
res=x_*y_;
break;
case '/':
res=x_/y_;
break;
case '%':
res=x_%y_;
break;
default:
cout<<"bug"<<endl;
break;
}
cout<<"当前任务正在被"<<pthread_self()<<"处理"<<x_<<op_<<y_<<"="<<res<<endl;
}
};
}
为任务类,处理将任务封装成有x,y,op的算数运算。
(3)线程创建
#include"BlockQueue.hpp"
#include"Task.hpp"
using namespace ns_blockqueue;
using namespace ns_task;
void* consumer(void* args)
{
BlockQueue<Task>* bq=(BlockQueue<Task>*)args;
while(true)
{
Task t;
bq->Pop(&t);
t.Run();
// int data=0;
// bq->Pop(&data);
// cout<<"消费者消费了一个数据"<<data<<endl;
}
}
void* productor(void* args)
{
BlockQueue<Task>* bq=(BlockQueue<Task>*)args;
while(true)
{
char arr[]="+-*/%";
int x=rand()%20+1;
int y=rand()%10+1;
char op=arr[(rand()%5)];
cout<<"生产者生产了一个任务"<<x<<op<<y<<"=?"<<endl;
Task* task=new Task(x,y,op);
bq->Push(*task);
// int data=rand()%20+1;
// cout<<"生产者生产的数据是"<<data<<endl;
// bq->Push(data);
}
}
int main()
{
srand((long long)time(nullptr));
BlockQueue<Task> *bq=new BlockQueue<Task>();
pthread_t c,c1,c2,c3,c4,p;
pthread_create(&c,nullptr,consumer,(void*)bq);
pthread_create(&c1,nullptr,consumer,(void*)bq);
pthread_create(&c2,nullptr,consumer,(void*)bq);
pthread_create(&c3,nullptr,consumer,(void*)bq);
pthread_create(&c4,nullptr,consumer,(void*)bq);
pthread_create(&p,nullptr,productor,(void*)bq);
pthread_join(c,nullptr);
pthread_join(c1,nullptr);
pthread_join(c2,nullptr);
pthread_join(c3,nullptr);
pthread_join(c4,nullptr);
pthread_join(p,nullptr);
}
这里我们创建了多个消费者,使用在生产者中使用随机数生产数据,在消费者中使用输出型参数来得到数据,并进行数据处理。
在出现多个消费者时,可能导致伪唤醒的问题,因此在阻塞队列中判断是否空还是满的时候使用while来保证唤醒条件。
- 点赞
- 收藏
- 关注作者
评论(0)