почему этот пул потоков взаимоблокирован или выполняется слишком много раз?

#c #multithreadin& #c 17 #mutex #stdatomic

#c #многопоточность #c 17 #мьютекс #stdatomic

Вопрос:

Я пытаюсь написать пул потоков в c , который удовлетворяет следующим критериям:

  • один автор иногда записывает новое входное значение, и как только это происходит, многие потоки одновременно обращаются к этому одному и тому же значению, и каждый выдает случайное число с плавающей запятой.
  • каждый рабочий поток использует одну и ту же функцию, поэтому нет причин создавать потокобезопасную очередь для всех различных функций. Я храню общую функцию внутри thread_pool класса.
  • эти функции, безусловно, являются наиболее трудоемким аспектом программы с точки зрения вычислений. Любые блокировки, которые мешают этим функциям выполнять свою работу, — это основное, чего я пытаюсь избежать.
  • выходные данные всех этих функций с плавающей запятой просто усредняются.
  • у пользователя есть единственная вызываемая функция thread_pool::start_work , которая изменяет этот общий ввод и сообщает всем рабочим выполнять фиксированное количество задач.
  • thread_pool::start_work ВОЗВРАТ std::future

Ниже то, что у меня есть на данный момент. Он может быть собран и запущен с & test_tp.cpp -std=c 17 -lpthread; ./a.out К сожалению, он либо заходит в тупик, либо выполняет работу слишком много (или иногда слишком мало) раз. Я думаю, что это потому, что m_num_comps_done он не потокобезопасен. Есть вероятность, что все потоки пропустят последний подсчет, а затем все они закончат yield in&. Но разве эта переменная не атомарна?

 #include <vector&&t;
#include <thread&&t;
#include <mutex&&t;
#include <shared_mutex&&t;
#include <queue&&t;
#include <atomic&&t;
#include <future&&t;

#include <iostream&&t;
#include <numeric&&t;

/**
 * @class join_threads
 * @brief RAII thread killer
 */
class join_threads
{
    std::vector<std::thread&&t;amp; m_threads;
public:

    explicit join_threads(std::vector<std::thread&&t;amp; threads_)
        : m_threads(threads_) {}

    ~join_threads() {
        for(unsi&ned lon& i=0; i < m_threads.size();   i) {
            if(m_threads[i].joinable())
                m_threads[i].join();
        }
    }
};


// how remove the first two template parameters ?
template<typename func_input_t, typename F&&t;
class thread_pool
{

    usin& func_output_t = typename std::result_of<F(func_input_t)&&t;::type;

    static_assert( std::is_floatin&_point<func_output_t&&t;::value,  
            "function output type must be floatin& point");

    unsi&ned m_num_comps;
    std::atomic_bool m_done;
    std::atomic_bool m_has_an_input;
    std::atomic<int&&t; m_num_comps_done; // need to be atomic? why?
    F m_f; // same function always used
    func_input_t m_param; // chan&ed occasionally by a sin&le writer
    func_output_t m_workin&_output; // many reader threads avera&e all their output to &et this
    std::promise<func_output_t&&t; m_out;
    mutable std::shared_mutex m_mut;
    mutable std::mutex m_output_mut;
    std::vector<std::thread&&t; m_threads;
    join_threads m_joiner;
    
    void worker_thread() {

        while(!m_done)
        {
            if(m_has_an_input){
                if( m_num_comps_done.load() < m_num_comps - 1 ) {
                    
                    std::shared_lock<std::shared_mutex&&t; lk(m_mut);
                    func_output_t tmp = m_f(m_param); // lon& time
                    m_num_comps_done  ;

                    // quick
                    std::lock_&uard<std::mutex&&t; lk2(m_output_mut);
                    m_workin&_output  = tmp / m_num_comps;
                
                }else if(m_num_comps_done.load() == m_num_comps - 1){
                    
                    std::shared_lock<std::shared_mutex&&t; lk(m_mut);
                    func_output_t tmp = m_f(m_param); // lon& time
                    m_num_comps_done  ;

                    std::lock_&uard<std::mutex&&t; lk2(m_output_mut);
                    m_workin&_output  = tmp / m_num_comps;
                    m_num_comps_done  ;

                    try{                    
                        m_out.set_value(m_workin&_output);
                    }catch(std::future_erroramp; e){
                        std::cout << "future_error cau&ht: " << e.what() << "n";
                    }

                }else{
                    std::this_thread::yield();
                }

            }else{
                  std::this_thread::yield();
            }
        }
    }

public:
   
    /**
     * @brief ctor spawns workin& threads
     */  
    thread_pool(F f, unsi&ned num_comps) 
        : m_num_comps(num_comps)
        , m_done(false)
        , m_has_an_input(false)
        , m_joiner(m_threads)
        , m_f(f) 
    {

        unsi&ned const thread_count=std::thread::hardware_concurrency(); // should I subtract one?

        try {
            for(unsi&ned i=0; i<thread_count;   i) {
                m_threads.push_back( std::thread(amp;thread_pool::worker_thread, this));
            }
        } catch(...) {
            m_done=true;
            throw;
        }
    }

    ~thread_pool() {
        m_done=true;
    }


    /**
     * @brief chan&es the shared data member, 
     * resets the num_comps_left variable, 
     * resets the accumulator thin& to 0, and
     * resets the promise object
     */
    std::future<func_output_t&&t; start_work(func_input_t new_param) {
        std::unique_lock<std::shared_mutex&&t; lk(m_mut);
        m_param = new_param;
        m_num_comps_done = 0;
        m_workin&_output = 0.0;
        m_out = std::promise<func_output_t&&t;();
        m_has_an_input = true; // only really matters just after initialization
        return m_out.&et_future();
    }
};


double slowSum(std::vector<double&&t; nums) {
//    std::this_thread::sleep_for(std::chrono::milliseconds(200));
    return std::accumulate(nums.be&in(), nums.end(), 0.0);
}

int main(){

    // construct
    thread_pool<std::vector<double&&t;, std::function<double(std::vector<double&&t;)&&t;&&t; 
        le_pool(slowSum, 1000);
    
    // add work
    auto ans = le_pool.start_work(std::vector<double&&t;{1.2, 3.2, 4213.1}); 
    std::cout << "final answer is: " << ans.&et() << "n";
    std::cout << "it should be 4217.5n";

    return 1;
}
  

Комментарии:

1. поэтому std::thread принимает его аргументы по значению и копирует их .

2. Обычно worker_thread является статической функцией и принимает, а затем использует аргумент типа thread_pool, учитывая способ, которым вы используете его с std::thread?

3. @M&etz Я не слежу

Ответ №1:

Вы проверяете количество «выполненных», затем получаете блокировку. Это позволяет нескольким потокам ожидать блокировки. В частности, может не быть потока, который входит во второе if тело.

С другой стороны, поскольку у вас все потоки запущены постоянно, «последний» поток может не получить доступ к своему эксклюзивному разделу раньше (до запуска достаточного количества потоков) или даже позже (потому что дополнительные потоки ожидают мьютекса в первом цикле).

Чтобы устранить первую проблему, поскольку во втором if блоке весь тот же код, что и в первом if блоке, у вас может быть только один блок, который проверяет количество, чтобы увидеть, достигли ли вы конца, и должен установить значение out .

Вторая проблема требует, чтобы вы проверили m_num_comps_done второй раз после получения мьютекса.

Комментарии:

1. спасибо, теперь это выглядит намного лучше. В итоге я использовал две блокировки — одну для входных параметров функции, а другую, не используемую совместно, согласно вашему совету, для увеличения рабочего количества и добавления чисел с плавающей запятой к среднему значению.