Потоки не выходят?

#c #multithreading #mutex #producer-consumer

Вопрос:

Я пытаюсь написать программу для решения проблемы производителя-потребителя с потоками на C , и, насколько я могу судить, программа отлично работает до самого конца, когда потоки должны завершиться с помощью функции join (). (Объект продукта представляет собой простой контейнер данных).

     #include <iostream>
    #include <random>
    #include <cstdlib>
    #include <ctime>
    #include <chrono>
    #include <sstream>
    #include <vector>
    #include <stack>
    #include <thread>
    #include <mutex>
    #include <atomic>
    #include <condition_variable>
    #include <Product.h>
    
    using namespace std;
    
    const int max_items = 100;
    atomic<int> itemNum(0);
    atomic<int> numProducersWorking(0);
    stack<Product> items;
    int maxBuffer;
    float storeSales[10];
    float monthSales[12];
    float totalSales;
    mutex xmutex;
    condition_variable isNotFull;
    condition_variable isNotEmpty;
    
    int intRand(const int amp; min, const int amp; max) {
        static thread_local mt19937 generator(time(0));
        uniform_int_distribution<int> distribution(min,max);
        return distribution(generator);
    }
    
    float floatRand(const float amp; min, const float amp; max) {
        static thread_local mt19937 generator(time(0));
        uniform_real_distribution<float> distribution(min,max);
        return distribution(generator);
    }
    
    void produce(int pId)
    {
        unique_lock<mutex> lock(xmutex);
        int day, month, year, id, regNum;
        float saleAmnt;
        Product item;
    
        id = pId;
        day = intRand(1, 30);
        month = intRand(1, 12);
        year = 20;
        regNum = intRand(1, 6);
        saleAmnt = floatRand(0.50, 999.99);
    
        item = Product(day, month, year, id, regNum, saleAmnt);
    
        isNotFull.wait(lock, [] { return items.size() != maxBuffer; });
        if(itemNum < max_items)
        {
            items.push(item);
            itemNum  ;
        }
    
        isNotEmpty.notify_all();
    }
    
    void consume(int cId)
    {
        unique_lock<mutex> lock(xmutex);
        Product item;
    
        isNotEmpty.wait(lock, [] { return items.size() > 0; });
        item = items.top();
        items.pop();
        storeSales[item.getStoreID()-1]  = item.getSaleAmnt();
        monthSales[item.getMonth()-1]  = item.getSaleAmnt();
        totalSales  = item.getSaleAmnt();
    
        isNotFull.notify_all();
    }
    
    void producer(int id)
    {
          numProducersWorking;
        while(itemNum < max_items)
        {
            produce(id);
            this_thread::sleep_for(chrono::milliseconds(intRand(5, 40)));
        }
        --numProducersWorking;
    }
    
    void consumer(int id)
    {
    
        while(numProducersWorking != 0 || items.size() > 0 )
            {
                consume(id);
            }
    
    }
    
    
    int main()
    {
        int p, c, b;
    
        p = 5;
        c = 5;
        b = 5;
    
        maxBuffer = b;
    
        vector<thread> prodsCons;
    
        auto start = chrono::high_resolution_clock::now();
    
        //create producers
        for(int i = 1; i <= p; i  )
        {
            prodsCons.push_back(thread(producer, i));
        }
    
         //create consumers
        for(int i = 0; i < c; i  )
        {
            prodsCons.push_back(thread(consumer, i));
        }
    
        int x = 0;
        //wait for consumers and producers to finish
        for(autoamp; th : prodsCons)
        {
            th.join();
            cout<<"thread "<<x<<" joined"<<endl;
            x  ;
        }
    
        auto stop = chrono::high_resolution_clock::now();
        auto duration = chrono::duration_cast<chrono::microseconds>(stop - start);
    
        cout<<"Store-wide total sales: "<<endl;
        for(int x = 1; x <= p; x  )
        {
            cout<<"  store "<<x<<" sales: $"<<storeSales[x-1]<<endl;
        }
        cout<<"Month-wise total sales: "<<endl;
        for(int x = 1; x <= 12; x  )
        {
            cout<<"  month "<<x<<" sales: $"<<monthSales[x-1]<<endl;
        }
        cout<<"Total sales: $"<<totalSales<<endl;
        cout<<"Simulation time: "<<duration.count()<<" microseconds"<<endl;
    }
 

Результат выглядит следующим образом:

 thread 0 joined
thread 1 joined
thread 2 joined
thread 3 joined
thread 4 joined
 

указывает на то, что 5 из 10 потоков не выходят (скорее всего, потребители), и поэтому программа никогда не доходит до конца. Есть ли условие, которое не выполняется, или я неправильно реализовал мьютексы?

Комментарии:

1. numProducersWorking в вашем цикле потребителей всегда будет 0; поэтому потребители никогда не закончат. Вам, вероятно, в любом случае не нужна эта переменная; потребители должны заботиться только о товарах, а не о том, сколько производителей, не так ли?

2. У вас есть ряд условий гонки, которые необходимо исправить. Например consumer() , чтение items.size() без какой-либо синхронизации, что недопустимо.

3. У вас есть условия гонки по всему этому коду. Как поток в wait (вызываемый из consume ) должен знать, что он должен завершиться? Из чего это получится wait ?

Ответ №1:

Как только поток потребления достигнет вызова condition_variable::wait внутри consume() , он не вернется без какого-либо сигнала.

Обычно у меня есть флаг завершения работы, который защищен тем же мьютексом, что и очередь, и мое условие ожидания будет зависеть от флага завершения работы и размера.

Когда пришло время для потребителей остановиться, я получаю мьютекс и устанавливаю флаг выключения. Затем, при выходе из ожидания, я либо выйду сразу после завершения работы, либо только в том случае, если очередь также пуста. Первое-это немедленное завершение работы, в то время как второе-завершение работы после завершения работы.

Кроме того, весь доступ к стеку элементов должен быть защищен мьютексом. Вы делали это в некоторых местах, но не в других.

Комментарии:

1. как бы я создал флаг завершения работы?

2. std::atomic<bool> shutdownPlease = false;