Порождающие потоки C в зависимости от доступных ядер процессора

#c

Вопрос:

Ниже приведен код Python, создающий пул процессов, выделяющий по одному процессу на файл с максимальным количеством процессоров.

Я бы хотел перевести это на C (но мы используем потоки, а не процессы).

Однако, из того, что я вижу std::async , не позволяет ограничить количество процессоров.

Как std::async узнать, чтобы не перегружать систему? Нет смысла создавать 20 потоков на 2-ядерном процессоре.

Я бы действительно предпочел не писать свой собственный пул потоков для чего-то, что должно быть очень распространенным.

Каков самый простой способ достичь этого?

 pool = multiprocessing.Pool(processes=max_cpus)

for file in file_list:
    pool.apply_async(my_func, args=(file,),)

pool.close()
pool.join()
 

Комментарии:

1. std::async не знает, что для этого вам понадобится пул потоков. Однако создание такого количества потоков, как у процессора, не означает, что у вас будет весь процессор для себя. Через минуту я опубликую демонстрационный пул потоков.

2. std::async не обязательно (обязательно) слепо создавать столько потоков, сколько задач вы ставите в очередь. На практике разработчики стандартной библиотеки будут иметь пул потоков, но размер этого пула потоков, очевидно, является детализацией реализации, вероятно, основанной на спецификациях оборудования и/или эвристике.

3. @CoryKramer Я понял, что это верно для Windows/MSVC (он использует пул потоков), но не обязательно для других систем/компиляторов, и что они будут просто использовать новый поток std::для каждого вызова

4. Вы можете использовать std::thread::hardware_concurrency() его для определения количества процессоров.

5. boost.org/doc/libs/1_77_0/doc/html/boost_asio/reference/…

Ответ №1:

Пример пула потоков в c , вы также можете использовать, скажем, один из boost (который, вероятно, лучше протестирован).

 #include <condition_variable>
#include <exception>
#include <mutex>
#include <future>
#include <thread>
#include <vector>
#include <queue>

namespace details
{
class task_itf
{
public:
    virtual void execute() = 0;
};

template<typename retval_t>
class task final :
    public task_itf
{
public:
    template<typename lambda_t>
    explicit task(lambda_tamp;amp; lambda) :
        m_task(lambda)
    {
    }

    std::future<retval_t> get_future()
    {
        return m_task.get_future();
    }

    virtual void execute() override
    {
        m_task();
    }

private:
    std::packaged_task<retval_t()> m_task;
};

class stop_exception :
    public std::exception
{
};

}

class thread_pool
{
public:
    explicit thread_pool(const size_t size) :
        m_is_running{ true }
    {
        std::condition_variable signal_started;
        std::atomic<size_t> number_of_threads_started{ 0u };

        for (auto n = 0; n < size;   n)
        {
            m_threads.push_back(std::move(std::thread([amp;]()
            {
                {
                    number_of_threads_started  ;
                    signal_started.notify_all();
                }

                thread_loop();
            })));
        }

        // wait for all threads to have started.
        std::mutex mtx;
        std::unique_lock<std::mutex> lock{ mtx };
        signal_started.wait(lock, [amp;] { return number_of_threads_started == size; });
    }

    ~thread_pool()
    {
        m_is_running = false;
        m_wakeup.notify_all();

        for (autoamp; thread : m_threads)
        {
            thread.join();
        }
    }

    template<typename lambda_t>
    auto async(lambda_tamp;amp; lambda) 
    {
        using retval_t = decltype(lambda());
        auto task = std::make_shared<details::task<retval_t>>(lambda);
        queue_task(task);
        return task->get_future();
    }

    template<typename lambda_t>
    auto sync(lambda_tamp;amp; lambda) 
    {
        auto ft = async(lambda);
        return ft.get();
    }

private:
    void queue_task(const std::shared_ptr<details::task_itf>amp; task_ptr)
    {
        std::unique_lock<std::mutex> lock(m_queue_mutex);
        m_queue.push(task_ptr);
        m_wakeup.notify_one();
    }

    std::shared_ptr<details::task_itf> get_next_task()
    {
        std::unique_lock<std::mutex> lock(m_queue_mutex);
        m_wakeup.wait(lock);

        if (!m_is_running)
        {
            throw details::stop_exception();
        }

        auto task = m_queue.front();
        m_queue.pop();

        return task;
    }

    void thread_loop()
    {
        try
        {
            while (auto task = get_next_task())
            {
                task->execute();
            }
        }
        catch (const details::stop_exceptionamp;)
        {
        }
    }

    std::vector<std::thread> m_threads;
    std::mutex m_queue_mutex;
    std::queue<std::shared_ptr<details::task_itf>> m_queue;

    std::condition_variable m_wakeup;
    bool m_is_running;
};

int main()
{
    thread_pool pool(4);
    auto ft1 = pool.async([] {return 1; });
    auto ft2 = pool.async([] {return 2; });
    auto ft3 = pool.async([] {return 3; });
    auto ft4 = pool.async([] {return 4; });

    // pool is will still be waiting

    //synchronize with results
    auto r1 = ft1.get();
    auto r2 = ft2.get();
    auto r3 = ft3.get();
    auto r4 = ft4.get();

    return 0;
}
 

Ответ №2:

std::thread::hardware_concurrency() сообщит вам, сколько потоков процессора должно быть активировано одновременно или сколько потоков вы должны сохранить в пуле потоков (будь то boost или если вы запускаете свой собственный).

Вот как может выглядеть этот подход:

  • позвоните std::thread::hardware_concurrency() , чтобы узнать количество ядер
  • пул потоков инициализации (существующий impl или ваш собственный) с результатом
  • оберните необходимые вычисления в a std::packaged_task и отправьте задачи в очередь заданий пула потоков