#c
Вопрос:
Ниже приведен код Python, создающий пул процессов, выделяющий по одному процессу на файл с максимальным количеством процессоров.
Я бы хотел перевести это на C (но мы используем потоки, а не процессы).
Однако, из того, что я вижу std::async
, не позволяет ограничить количество процессоров.
Как std::async
узнать, чтобы не перегружать систему? Нет смысла создавать 20 потоков на 2-ядерном процессоре.
Я бы действительно предпочел не писать свой собственный пул потоков для чего-то, что должно быть очень распространенным.
Каков самый простой способ достичь этого?
pool = multiprocessing.Pool(processes=max_cpus)
for file in file_list:
pool.apply_async(my_func, args=(file,),)
pool.close()
pool.join()
Комментарии:
1. std::async не знает, что для этого вам понадобится пул потоков. Однако создание такого количества потоков, как у процессора, не означает, что у вас будет весь процессор для себя. Через минуту я опубликую демонстрационный пул потоков.
2.
std::async
не обязательно (обязательно) слепо создавать столько потоков, сколько задач вы ставите в очередь. На практике разработчики стандартной библиотеки будут иметь пул потоков, но размер этого пула потоков, очевидно, является детализацией реализации, вероятно, основанной на спецификациях оборудования и/или эвристике.3. @CoryKramer Я понял, что это верно для Windows/MSVC (он использует пул потоков), но не обязательно для других систем/компиляторов, и что они будут просто использовать новый поток std::для каждого вызова
4. Вы можете использовать
std::thread::hardware_concurrency()
его для определения количества процессоров.5. boost.org/doc/libs/1_77_0/doc/html/boost_asio/reference/…
Ответ №1:
Пример пула потоков в c , вы также можете использовать, скажем, один из boost (который, вероятно, лучше протестирован).
#include <condition_variable>
#include <exception>
#include <mutex>
#include <future>
#include <thread>
#include <vector>
#include <queue>
namespace details
{
class task_itf
{
public:
virtual void execute() = 0;
};
template<typename retval_t>
class task final :
public task_itf
{
public:
template<typename lambda_t>
explicit task(lambda_tamp;amp; lambda) :
m_task(lambda)
{
}
std::future<retval_t> get_future()
{
return m_task.get_future();
}
virtual void execute() override
{
m_task();
}
private:
std::packaged_task<retval_t()> m_task;
};
class stop_exception :
public std::exception
{
};
}
class thread_pool
{
public:
explicit thread_pool(const size_t size) :
m_is_running{ true }
{
std::condition_variable signal_started;
std::atomic<size_t> number_of_threads_started{ 0u };
for (auto n = 0; n < size; n)
{
m_threads.push_back(std::move(std::thread([amp;]()
{
{
number_of_threads_started ;
signal_started.notify_all();
}
thread_loop();
})));
}
// wait for all threads to have started.
std::mutex mtx;
std::unique_lock<std::mutex> lock{ mtx };
signal_started.wait(lock, [amp;] { return number_of_threads_started == size; });
}
~thread_pool()
{
m_is_running = false;
m_wakeup.notify_all();
for (autoamp; thread : m_threads)
{
thread.join();
}
}
template<typename lambda_t>
auto async(lambda_tamp;amp; lambda)
{
using retval_t = decltype(lambda());
auto task = std::make_shared<details::task<retval_t>>(lambda);
queue_task(task);
return task->get_future();
}
template<typename lambda_t>
auto sync(lambda_tamp;amp; lambda)
{
auto ft = async(lambda);
return ft.get();
}
private:
void queue_task(const std::shared_ptr<details::task_itf>amp; task_ptr)
{
std::unique_lock<std::mutex> lock(m_queue_mutex);
m_queue.push(task_ptr);
m_wakeup.notify_one();
}
std::shared_ptr<details::task_itf> get_next_task()
{
std::unique_lock<std::mutex> lock(m_queue_mutex);
m_wakeup.wait(lock);
if (!m_is_running)
{
throw details::stop_exception();
}
auto task = m_queue.front();
m_queue.pop();
return task;
}
void thread_loop()
{
try
{
while (auto task = get_next_task())
{
task->execute();
}
}
catch (const details::stop_exceptionamp;)
{
}
}
std::vector<std::thread> m_threads;
std::mutex m_queue_mutex;
std::queue<std::shared_ptr<details::task_itf>> m_queue;
std::condition_variable m_wakeup;
bool m_is_running;
};
int main()
{
thread_pool pool(4);
auto ft1 = pool.async([] {return 1; });
auto ft2 = pool.async([] {return 2; });
auto ft3 = pool.async([] {return 3; });
auto ft4 = pool.async([] {return 4; });
// pool is will still be waiting
//synchronize with results
auto r1 = ft1.get();
auto r2 = ft2.get();
auto r3 = ft3.get();
auto r4 = ft4.get();
return 0;
}
Ответ №2:
std::thread::hardware_concurrency()
сообщит вам, сколько потоков процессора должно быть активировано одновременно или сколько потоков вы должны сохранить в пуле потоков (будь то boost или если вы запускаете свой собственный).
Вот как может выглядеть этот подход:
- позвоните
std::thread::hardware_concurrency()
, чтобы узнать количество ядер - пул потоков инициализации (существующий impl или ваш собственный) с результатом
- оберните необходимые вычисления в a
std::packaged_task
и отправьте задачи в очередь заданий пула потоков