#c #multithreading #mutex #producer-consumer
Вопрос:
Я пытаюсь написать программу для решения проблемы производителя-потребителя с потоками на C , и, насколько я могу судить, программа отлично работает до самого конца, когда потоки должны завершиться с помощью функции join (). (Объект продукта представляет собой простой контейнер данных).
#include <iostream>
#include <random>
#include <cstdlib>
#include <ctime>
#include <chrono>
#include <sstream>
#include <vector>
#include <stack>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <Product.h>
using namespace std;
const int max_items = 100;
atomic<int> itemNum(0);
atomic<int> numProducersWorking(0);
stack<Product> items;
int maxBuffer;
float storeSales[10];
float monthSales[12];
float totalSales;
mutex xmutex;
condition_variable isNotFull;
condition_variable isNotEmpty;
int intRand(const int amp; min, const int amp; max) {
static thread_local mt19937 generator(time(0));
uniform_int_distribution<int> distribution(min,max);
return distribution(generator);
}
float floatRand(const float amp; min, const float amp; max) {
static thread_local mt19937 generator(time(0));
uniform_real_distribution<float> distribution(min,max);
return distribution(generator);
}
void produce(int pId)
{
unique_lock<mutex> lock(xmutex);
int day, month, year, id, regNum;
float saleAmnt;
Product item;
id = pId;
day = intRand(1, 30);
month = intRand(1, 12);
year = 20;
regNum = intRand(1, 6);
saleAmnt = floatRand(0.50, 999.99);
item = Product(day, month, year, id, regNum, saleAmnt);
isNotFull.wait(lock, [] { return items.size() != maxBuffer; });
if(itemNum < max_items)
{
items.push(item);
itemNum ;
}
isNotEmpty.notify_all();
}
void consume(int cId)
{
unique_lock<mutex> lock(xmutex);
Product item;
isNotEmpty.wait(lock, [] { return items.size() > 0; });
item = items.top();
items.pop();
storeSales[item.getStoreID()-1] = item.getSaleAmnt();
monthSales[item.getMonth()-1] = item.getSaleAmnt();
totalSales = item.getSaleAmnt();
isNotFull.notify_all();
}
void producer(int id)
{
numProducersWorking;
while(itemNum < max_items)
{
produce(id);
this_thread::sleep_for(chrono::milliseconds(intRand(5, 40)));
}
--numProducersWorking;
}
void consumer(int id)
{
while(numProducersWorking != 0 || items.size() > 0 )
{
consume(id);
}
}
int main()
{
int p, c, b;
p = 5;
c = 5;
b = 5;
maxBuffer = b;
vector<thread> prodsCons;
auto start = chrono::high_resolution_clock::now();
//create producers
for(int i = 1; i <= p; i )
{
prodsCons.push_back(thread(producer, i));
}
//create consumers
for(int i = 0; i < c; i )
{
prodsCons.push_back(thread(consumer, i));
}
int x = 0;
//wait for consumers and producers to finish
for(autoamp; th : prodsCons)
{
th.join();
cout<<"thread "<<x<<" joined"<<endl;
x ;
}
auto stop = chrono::high_resolution_clock::now();
auto duration = chrono::duration_cast<chrono::microseconds>(stop - start);
cout<<"Store-wide total sales: "<<endl;
for(int x = 1; x <= p; x )
{
cout<<" store "<<x<<" sales: $"<<storeSales[x-1]<<endl;
}
cout<<"Month-wise total sales: "<<endl;
for(int x = 1; x <= 12; x )
{
cout<<" month "<<x<<" sales: $"<<monthSales[x-1]<<endl;
}
cout<<"Total sales: $"<<totalSales<<endl;
cout<<"Simulation time: "<<duration.count()<<" microseconds"<<endl;
}
Результат выглядит следующим образом:
thread 0 joined
thread 1 joined
thread 2 joined
thread 3 joined
thread 4 joined
указывает на то, что 5 из 10 потоков не выходят (скорее всего, потребители), и поэтому программа никогда не доходит до конца. Есть ли условие, которое не выполняется, или я неправильно реализовал мьютексы?
Комментарии:
1.
numProducersWorking
в вашем цикле потребителей всегда будет 0; поэтому потребители никогда не закончат. Вам, вероятно, в любом случае не нужна эта переменная; потребители должны заботиться только о товарах, а не о том, сколько производителей, не так ли?2. У вас есть ряд условий гонки, которые необходимо исправить. Например
consumer()
, чтениеitems.size()
без какой-либо синхронизации, что недопустимо.3. У вас есть условия гонки по всему этому коду. Как поток в
wait
(вызываемый изconsume
) должен знать, что он должен завершиться? Из чего это получитсяwait
?
Ответ №1:
Как только поток потребления достигнет вызова condition_variable::wait внутри consume()
, он не вернется без какого-либо сигнала.
Обычно у меня есть флаг завершения работы, который защищен тем же мьютексом, что и очередь, и мое условие ожидания будет зависеть от флага завершения работы и размера.
Когда пришло время для потребителей остановиться, я получаю мьютекс и устанавливаю флаг выключения. Затем, при выходе из ожидания, я либо выйду сразу после завершения работы, либо только в том случае, если очередь также пуста. Первое-это немедленное завершение работы, в то время как второе-завершение работы после завершения работы.
Кроме того, весь доступ к стеку элементов должен быть защищен мьютексом. Вы делали это в некоторых местах, но не в других.
Комментарии:
1. как бы я создал флаг завершения работы?
2.
std::atomic<bool> shutdownPlease = false;