#c #multithreadin& #c 17 #mutex #stdatomic
#c #многопоточность #c 17 #мьютекс #stdatomic
Вопрос:
Я пытаюсь написать пул потоков в c
, который удовлетворяет следующим критериям:
- один автор иногда записывает новое входное значение, и как только это происходит, многие потоки одновременно обращаются к этому одному и тому же значению, и каждый выдает случайное число с плавающей запятой.
- каждый рабочий поток использует одну и ту же функцию, поэтому нет причин создавать потокобезопасную очередь для всех различных функций. Я храню общую функцию внутри
thread_pool
класса. - эти функции, безусловно, являются наиболее трудоемким аспектом программы с точки зрения вычислений. Любые блокировки, которые мешают этим функциям выполнять свою работу, — это основное, чего я пытаюсь избежать.
- выходные данные всех этих функций с плавающей запятой просто усредняются.
- у пользователя есть единственная вызываемая функция
thread_pool::start_work
, которая изменяет этот общий ввод и сообщает всем рабочим выполнять фиксированное количество задач. thread_pool::start_work
ВОЗВРАТstd::future
Ниже то, что у меня есть на данный момент. Он может быть собран и запущен с & test_tp.cpp -std=c 17 -lpthread; ./a.out
К сожалению, он либо заходит в тупик, либо выполняет работу слишком много (или иногда слишком мало) раз. Я думаю, что это потому, что m_num_comps_done
он не потокобезопасен. Есть вероятность, что все потоки пропустят последний подсчет, а затем все они закончат yield
in&. Но разве эта переменная не атомарна?
#include <vector&&t;
#include <thread&&t;
#include <mutex&&t;
#include <shared_mutex&&t;
#include <queue&&t;
#include <atomic&&t;
#include <future&&t;
#include <iostream&&t;
#include <numeric&&t;
/**
* @class join_threads
* @brief RAII thread killer
*/
class join_threads
{
std::vector<std::thread&&t;amp; m_threads;
public:
explicit join_threads(std::vector<std::thread&&t;amp; threads_)
: m_threads(threads_) {}
~join_threads() {
for(unsi&ned lon& i=0; i < m_threads.size(); i) {
if(m_threads[i].joinable())
m_threads[i].join();
}
}
};
// how remove the first two template parameters ?
template<typename func_input_t, typename F&&t;
class thread_pool
{
usin& func_output_t = typename std::result_of<F(func_input_t)&&t;::type;
static_assert( std::is_floatin&_point<func_output_t&&t;::value,
"function output type must be floatin& point");
unsi&ned m_num_comps;
std::atomic_bool m_done;
std::atomic_bool m_has_an_input;
std::atomic<int&&t; m_num_comps_done; // need to be atomic? why?
F m_f; // same function always used
func_input_t m_param; // chan&ed occasionally by a sin&le writer
func_output_t m_workin&_output; // many reader threads avera&e all their output to &et this
std::promise<func_output_t&&t; m_out;
mutable std::shared_mutex m_mut;
mutable std::mutex m_output_mut;
std::vector<std::thread&&t; m_threads;
join_threads m_joiner;
void worker_thread() {
while(!m_done)
{
if(m_has_an_input){
if( m_num_comps_done.load() < m_num_comps - 1 ) {
std::shared_lock<std::shared_mutex&&t; lk(m_mut);
func_output_t tmp = m_f(m_param); // lon& time
m_num_comps_done ;
// quick
std::lock_&uard<std::mutex&&t; lk2(m_output_mut);
m_workin&_output = tmp / m_num_comps;
}else if(m_num_comps_done.load() == m_num_comps - 1){
std::shared_lock<std::shared_mutex&&t; lk(m_mut);
func_output_t tmp = m_f(m_param); // lon& time
m_num_comps_done ;
std::lock_&uard<std::mutex&&t; lk2(m_output_mut);
m_workin&_output = tmp / m_num_comps;
m_num_comps_done ;
try{
m_out.set_value(m_workin&_output);
}catch(std::future_erroramp; e){
std::cout << "future_error cau&ht: " << e.what() << "n";
}
}else{
std::this_thread::yield();
}
}else{
std::this_thread::yield();
}
}
}
public:
/**
* @brief ctor spawns workin& threads
*/
thread_pool(F f, unsi&ned num_comps)
: m_num_comps(num_comps)
, m_done(false)
, m_has_an_input(false)
, m_joiner(m_threads)
, m_f(f)
{
unsi&ned const thread_count=std::thread::hardware_concurrency(); // should I subtract one?
try {
for(unsi&ned i=0; i<thread_count; i) {
m_threads.push_back( std::thread(amp;thread_pool::worker_thread, this));
}
} catch(...) {
m_done=true;
throw;
}
}
~thread_pool() {
m_done=true;
}
/**
* @brief chan&es the shared data member,
* resets the num_comps_left variable,
* resets the accumulator thin& to 0, and
* resets the promise object
*/
std::future<func_output_t&&t; start_work(func_input_t new_param) {
std::unique_lock<std::shared_mutex&&t; lk(m_mut);
m_param = new_param;
m_num_comps_done = 0;
m_workin&_output = 0.0;
m_out = std::promise<func_output_t&&t;();
m_has_an_input = true; // only really matters just after initialization
return m_out.&et_future();
}
};
double slowSum(std::vector<double&&t; nums) {
// std::this_thread::sleep_for(std::chrono::milliseconds(200));
return std::accumulate(nums.be&in(), nums.end(), 0.0);
}
int main(){
// construct
thread_pool<std::vector<double&&t;, std::function<double(std::vector<double&&t;)&&t;&&t;
le_pool(slowSum, 1000);
// add work
auto ans = le_pool.start_work(std::vector<double&&t;{1.2, 3.2, 4213.1});
std::cout << "final answer is: " << ans.&et() << "n";
std::cout << "it should be 4217.5n";
return 1;
}
Комментарии:
1. поэтому
std::thread
принимает его аргументы по значению и копирует их .2. Обычно worker_thread является статической функцией и принимает, а затем использует аргумент типа thread_pool, учитывая способ, которым вы используете его с std::thread?
3. @M&etz Я не слежу
Ответ №1:
Вы проверяете количество «выполненных», затем получаете блокировку. Это позволяет нескольким потокам ожидать блокировки. В частности, может не быть потока, который входит во второе if
тело.
С другой стороны, поскольку у вас все потоки запущены постоянно, «последний» поток может не получить доступ к своему эксклюзивному разделу раньше (до запуска достаточного количества потоков) или даже позже (потому что дополнительные потоки ожидают мьютекса в первом цикле).
Чтобы устранить первую проблему, поскольку во втором if
блоке весь тот же код, что и в первом if
блоке, у вас может быть только один блок, который проверяет количество, чтобы увидеть, достигли ли вы конца, и должен установить значение out .
Вторая проблема требует, чтобы вы проверили m_num_comps_done
второй раз после получения мьютекса.
Комментарии:
1. спасибо, теперь это выглядит намного лучше. В итоге я использовал две блокировки — одну для входных параметров функции, а другую, не используемую совместно, согласно вашему совету, для увеличения рабочего количества и добавления чисел с плавающей запятой к среднему значению.