Linux的生产者消费者模型
本文介绍了Linux多线程编程中的生产者消费者模型,详细分析了基于阻塞队列和环形阻塞队列的两种实现方案。阻塞队列方案使用锁+条件变量实现线程同步,通过互斥锁保证线程安全,条件变量控制阻塞唤醒。环形队列方案则采用信号量机制,通过两个信号量分别控制生产者和消费者的同步操作。文章通过代码示例展示了两种实现的具体应用,并比较了它们的特性差异,指出环形队列在多块临界资源管理中的优势。最后通过一道面试题演示了
·
基于前文Linux的多线程-CSDN博客。
目录
1、生产者消费者模型
- 1个交易场所(以特定结构构成的一种“内存”空间)。
- 2种角色(生产者,消费者,由线程承担)。
- 3种关系(生产者之间,互斥,消费者之间,互斥,生产者与消费者之间,互斥+同步)。
- 生产者与消费者模型的优点:
- 生产过程与消费过程解耦。
- 支持忙闲不均。
- 提高效率。不是体现在交易场所的出/入,而是体现在生产任务和处理任务是并发的(交易场所非空非满时)。
2、基于阻塞队列的生产者消费者模型
- 阻塞队列,是一种常用于实现生产者和消费者模型的数据结构,当队列为空时,向队列里获取元素的操作会阻塞,当队列为满时,向队列里存放元素的操作会阻塞。
画外音:管道的本质也是阻塞队列,为空时,读端阻塞,为满时,写端阻塞。
2.1 思路
- 生产者与消费者之间采用锁+条件变量,实现互斥+同步。
- 生产者之间和消费者之间,采用锁,实现互斥。
- 这一个锁,互斥,用于生产者与消费者竞争(互斥),生产者之间竞争(互斥),消费者之间竞争(互斥)。
- 两个条件变量,同步,分别控制生产者的阻塞和唤醒,消费者的阻塞和唤醒。
- 两个sleep_num,分别用于记录因空阻塞的消费者和因满阻塞的生产者。
- 要点:
- 阻塞队列的什么时候唤醒,是满或空,还是生成一个唤醒或消费一个唤醒?是生成一个唤醒或消费一个唤醒,为了更好的并发(生产者在生产,同时消费者在消费)。
- 唤醒一个还是全部?通常是唤醒一个(signal()),以避免惊群效应,提高效率。
- 为什么while阻塞?因为当线程被唤醒时,线程切换,其他线程可能已经将状态改变,或者被误唤醒,需重新判断条件。
2.2 代码
2.2.1 Main.cc
#include <iostream>
#include <unistd.h>
#include <functional>
#include "BlockingQueue.hpp"
void DownLoad()
{
std::cout << "下载一个任务" << std::endl;
sleep(3); // 假设任务的耗时。
}
using task_t = std::function<void()>;
void* Producer(void* arg)
{
BlockingQueue<task_t>* bq = static_cast<BlockingQueue<task_t>*>(arg);
while(true)
{
bq->Push(DownLoad);
std::cout << "生产了一个任务" << std::endl;
sleep(3);
}
}
void* Consumer(void* arg)
{
BlockingQueue<task_t>* bq = static_cast<BlockingQueue<task_t>*>(arg);
while(true)
{
task_t task = bq->Pop();
task();
sleep(3);
}
}
int main()
{
BlockingQueue<task_t> bq;
pthread_t p[2],c[2];
pthread_create(p,nullptr,Producer,&bq);
pthread_create(p+1,nullptr,Producer,&bq);
pthread_create(c,nullptr,Consumer,&bq);
pthread_create(c+1,nullptr,Consumer,&bq);
pthread_join(p[0],nullptr);
pthread_join(p[1],nullptr);
pthread_join(c[0],nullptr);
pthread_join(c[2],nullptr);
return 0;
}
2.2.2 CondBlockingQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include "Cond.hpp"
#include "Mutex.hpp"
using namespace CondModule;
using namespace MutexModule;
const int default_cap = 5;
template <typename T>
class BlockingQueue
{
bool isEmpty()
{
return bq.empty();
}
bool isFull()
{
return bq.size() >= _cap;
}
public:
BlockingQueue(int cap = default_cap)
: _cap(cap)
,_p_sleep_num(0)
,_c_sleep_num(0)
{
}
void Push(const T& in)
{
// 生产者
LockGuard lockguard(_mutex);
while(isFull())
{
++_p_sleep_num;
std::cout << "生产者因为满而阻塞: " << _p_sleep_num << std::endl;
_full_cond.Wait(_mutex); // 生产者因为满而阻塞
--_p_sleep_num;
}
bq.push(in);
if(_c_sleep_num > 0)
{
_empty_cond.Signal();// 唤醒一个因为空而阻塞的消费者
std::cout << "唤醒一个因为空而阻塞的消费者" << std::endl;
}
}
T Pop()
{
// 消费者
LockGuard lockguard(_mutex);
while(isEmpty())
{
++_c_sleep_num;
std::cout << "消费者者因为空而阻塞: " << _c_sleep_num << std::endl;
_empty_cond.Wait(_mutex); // 消费者者因为空而阻塞
--_c_sleep_num;
}
T data = bq.front();
bq.pop();
if(_p_sleep_num > 0)
{
_full_cond.Signal(); // 唤醒一个因为满而阻塞的生产者
std::cout << "唤醒一个因为满而阻塞的生产者" << std::endl;
}
return data;
}
private:
std::queue<T> bq;
int _cap;
Mutex _mutex;
Cond _empty_cond;
Cond _full_cond;
int _p_sleep_num;
int _c_sleep_num;
};
2.2.3 Cond.hpp
#pragma once
#include <pthread.h>
#include "Mutex.hpp"
namespace CondModule
{
class Cond
{
public:
// RAII,资源的初始化与释放与对象的生命周期绑定
Cond()
{
pthread_cond_init(&_cond,nullptr);
}
void Wait(MutexModule::Mutex& mutex)
{
pthread_cond_wait(&_cond,mutex.Get());
}
void Signal()
{
pthread_cond_signal(&_cond);
}
void Broadcast()
{
pthread_cond_broadcast(&_cond);
}
~Cond()
{
pthread_cond_destroy(&_cond);
}
private:
pthread_cond_t _cond;
};
}
2.2.4 Mutex.hpp
#pragma once
#include <pthread.h>
namespace MutexModule
{
class Mutex
{
// RAII,资源的初始化与释放与对象的生命周期绑定
public:
Mutex()
{
pthread_mutex_init(&_mutex, nullptr);
}
void Lock()
{
pthread_mutex_lock(&_mutex);
}
void Unlock()
{
pthread_mutex_unlock(&_mutex);
}
pthread_mutex_t *Get()
{
return &_mutex;
}
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
private:
pthread_mutex_t _mutex;
};
class LockGuard
{
public:
LockGuard(Mutex &mutex)
: _mutex(mutex)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex &_mutex;
};
}
3、基于环形阻塞队列的生产者消费者模型
- 环形队列,使用取模进行循环。和阻塞队列一样,空了,消费者阻塞,满了,生产者阻塞。
3.1 思路
- 生产者与消费者之间采用信号量,实现互斥+同步。
- 生产者之间采用锁,实现互斥;消费者之间采用锁,实现互斥。
- 两个信号量,互斥,信号量的原子操作保证了计数的一致性;生产者和消费者不能操作同一个位置(满了或空了,处于同一个位置,不能同时操作)。同步,记录可用空位数量,控制生产者的阻塞和生产;记录已有数据数量,控制消费者的阻塞和消费。
- 两个锁,互斥,分别用于生产者之间竞争(互斥),消费者之间竞争(互斥)。
- 两个step,用于下标索引,分别用于生产者push,消费者pop。
- 要点:
- 前面阻塞队列的queue是push,会自动扩容,但是环形队列使用下标索引,所以在初始化的时候,要先开辟空间。
3.2 代码
3.2.1 Main.cc
#include <iostream>
#include <unistd.h>
#include <functional>
#include "RingQueue.hpp"
void DownLoad()
{
std::cout << "下载一个任务" << std::endl;
sleep(3); // 假设任务的耗时。
}
using task_t = std::function<void()>;
void* Producer(void* arg)
{
RingQueue<task_t>* rq = static_cast<RingQueue<task_t>*>(arg);
while(true)
{
rq->Push(DownLoad);
std::cout << "生产了一个任务" << std::endl;
sleep(3);
}
}
void* Consumer(void* arg)
{
RingQueue<task_t>* rq = static_cast<RingQueue<task_t>*>(arg);
while(true)
{
task_t task = rq->Pop();
task();
sleep(3);
}
}
int main()
{
RingQueue<task_t> rq;
pthread_t p[2],c[2];
pthread_create(p,nullptr,Producer,&rq);
pthread_create(p+1,nullptr,Producer,&rq);
pthread_create(c,nullptr,Consumer,&rq);
pthread_create(c+1,nullptr,Consumer,&rq);
pthread_join(p[0],nullptr);
pthread_join(p[1],nullptr);
pthread_join(c[0],nullptr);
pthread_join(c[1],nullptr);
return 0;
}
3.2.2 RingQueue.hpp
#pragma once
#include <vector>
#include "Mutex.hpp"
#include "Sem.hpp"
using namespace MutexModule;
using namespace SemModule;
const int default_cap = 5;
template<typename T>
class RingQueue
{
public:
RingQueue(int cap = default_cap)
:_rq(cap)
,_cap(cap)
,_blank_sem(cap)
,_p_step(0)
,_data_sem(0)
,_c_step(0)
{
}
void Push(const T& in)
{
_blank_sem.P();
LockGuard lockguard(_p_mutex);
_rq[_p_step] = in;
++_p_step;
_p_step %= _cap;
_data_sem.V();
}
T& Pop()
{
_data_sem.P();
LockGuard lockguard(_c_mutex);
T& data = _rq[_c_step];
++_c_step;
_c_step %= _cap;
_blank_sem.V();
return data;
}
private:
std::vector<T> _rq;
int _cap;
// 生产者
Sem _blank_sem; // 空位数
int _p_step;
Mutex _p_mutex; // 生产者之间的互斥
// 消费者
Sem _data_sem; // 资源数
int _c_step;
Mutex _c_mutex; // 消费者之间的互斥
};
3.2.3 Sem.hpp
#pragma once
#include <semaphore.h>
namespace SemModule
{
const unsigned int default_value = 1;
class Sem
{
public:
// RAII,资源的初始化与释放与对象的生命周期绑定
Sem(unsigned int sem_value = default_value)
: _sem_value(sem_value)
{
sem_init(&_sem, 0, _sem_value);
}
void P()
{
sem_wait(&_sem);
}
void V()
{
sem_post(&_sem);
}
~Sem()
{
sem_destroy(&_sem);
}
private:
unsigned int _sem_value;
sem_t _sem;
};
}
3.2.4 Mutex.hpp
#pragma once
#include <pthread.h>
namespace MutexModule
{
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_mutex, nullptr);
}
void Lock()
{
pthread_mutex_lock(&_mutex);
}
void Unlock()
{
pthread_mutex_unlock(&_mutex);
}
pthread_mutex_t *Get()
{
return &_mutex;
}
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
private:
pthread_mutex_t _mutex;
};
// RAII,资源的初始化与释放与对象的生命周期绑定
class LockGuard
{
public:
LockGuard(Mutex &mutex)
: _mutex(mutex)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex &_mutex;
};
}
4、总结一下
特性 | 普通阻塞队列 | 环形阻塞队列 |
---|---|---|
内存分配 | 可能频繁分配 / 释放内存(链表实现) | 一次性分配固定内存,无内存碎片 |
扩容机制 | 支持动态扩容 | 容量固定,无法动态扩容 |
访问效率 | 队列头尾操作 O (1),但可能有缓存不友好问题 | 数组连续存储,缓存局部性更好,访问效率更高 |
空间利用率 | 动态大小,空间利用率灵活 | 固定空间,利用率稳定但可能有浪费 |
实现复杂度 | 相对简单 | 稍复杂(需处理环形边界条件) |
- 一种整块临界资源 -> 锁+条件变量。
- 一种多块临界资源 -> 锁+信号量。显然,在生产者消费者模型中,临界资源是一种多块临界资源,锁+信号量的方式更好实现,如:逻辑上更清晰,信号量直接表达资源数量,不需要在唤醒时重新检查复杂条件等。
5、一道有意思的面试题
题目:char arr1[5] = "1234"; char arr2[5] = "abcd";,请打印出"1a2b3c4d";
5.1 思路
- 基于前面的练习,如果是锁+条件变量,竞争锁,顺序是不确定的,而两个信号量刚好能控制顺序,一个为arr1_sem = 1,arr1_sem .P(),arr2_sem.V();一个arr2_sem = 0,arr2_sem .P(),arr1_sem.V()。
- 如果非要用锁(同一时间只能打印一个),那需要一个条件变量,控制两个线程的打印阻塞与唤醒(同时只有一个会阻塞),还要一个标志,true为arr1打印,false为arr2打印。
5.2 代码
5.2.1 信号量
#include <iostream>
#include <pthread.h>
#include "Sem.hpp"
using namespace SemModule;
Sem sem1(1),sem2(0);
char arr1[5] = "1234"; // 包含'/0'
char arr2[5] = "abcd";
void *Func1(void *arg)
{
for(const auto& e : arr1)
{
sem1.P();
std::cout << e;
sem2.V();
}
return nullptr;
}
void *Func2(void *arg)
{
for(const auto& e : arr2)
{
sem2.P();
std::cout << e;
sem1.V();
}
return nullptr;
}
int main()
{
pthread_t tid1, tid2;
pthread_create(&tid1, nullptr, Func1, nullptr);
pthread_create(&tid2, nullptr, Func2, nullptr);
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
std::cout << std::endl;
return 0;
}
5.2.2 锁+条件变量
#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"
using namespace MutexModule;
using namespace CondModule;
char arr1[5] = "1234"; // 包含'/0'
char arr2[5] = "abcd";
Mutex mutex;
Cond cond;
bool flag = true;
void *Func1(void *arg)
{
for(const auto& e : arr1)
{
LockGuard lockguard(mutex);
while(!flag)
cond.Wait(mutex);
std::cout << e;
flag = !flag;
cond.Signal();
}
return nullptr;
}
void *Func2(void *arg)
{
for(const auto& e : arr2)
{
LockGuard lockguard(mutex);
while(flag)
cond.Wait(mutex);
std::cout << e;
flag = !flag;
cond.Signal();
}
return nullptr;
}
int main()
{
pthread_t tid1, tid2;
pthread_create(&tid1, nullptr, Func1, nullptr);
pthread_create(&tid2, nullptr, Func2, nullptr);
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
std::cout << std::endl;
return 0;
}
更多推荐
所有评论(0)