#c #multithreading
#c #многопоточность
Вопрос:
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <iostream>
#include <deque>
#include <functional>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <atomic>
#include <vector>
//thread pool
class ThreadPool
{
public:
ThreadPool(unsigned int n = std::thread::hardware_concurrency())
: busy()
, processed()
, stop()
{
for (unsigned int i=0; i<n; i)
workers.emplace_back(std::bind(amp;ThreadPool::thread_proc, this));
}
template<class F> void enqueue(Famp;amp; f)
{
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace_back(std::forward<F>(f));
cv_task.notify_one();
}
void waitFinished()
{
std::unique_lock<std::mutex> lock(queue_mutex);
cv_finished.wait(lock, [this](){ return tasks.empty() amp;amp; (busy == 0); });
}
~ThreadPool()
{
// set stop-condition
std::unique_lock<std::mutex> latch(queue_mutex);
stop = true;
cv_task.notify_all();
latch.unlock();
// all threads terminate, then we're done.
for (autoamp; t : workers)
t.join();
}
unsigned int getProcessed() const { return processed; }
private:
std::vector< std::thread > workers;
std::deque< std::function<void()> > tasks;
std::mutex queue_mutex;
std::condition_variable cv_task;
std::condition_variable cv_finished;
unsigned int busy;
std::atomic_uint processed;
bool stop;
void thread_proc()
{
while (true)
{
std::unique_lock<std::mutex> latch(queue_mutex);
cv_task.wait(latch, [this](){ return stop || !tasks.empty(); });
if (!tasks.empty())
{
// got work. set busy.
busy;
// pull from queue
auto fn = tasks.front();
tasks.pop_front();
// release lock. run async
latch.unlock();
// run function outside context
fn();
processed;
latch.lock();
--busy;
cv_finished.notify_one();
}
else if (stop)
{
break;
}
}
}
};
#endif // THREADPOOL_H
У меня есть вышеупомянутая реализация пула потоков с использованием защелки. Однако каждый раз, когда я добавляю задачу с помощью вызова enqueue, накладные расходы довольно велики, это занимает около 100 микросекунд.
Как я могу улучшить производительность пула потоков?
Комментарии:
1. 1) сборка с включенной оптимизацией.
2. Извините за беспокойство, но не могли бы вы любезно указать, каковы возможные флаги оптимизации?
3. Для gcc или clang:
-O2
или-O3
. Для cl.exe:/O2
. Смотрите также gcc.gnu.org/onlinedocs/gcc/Optimize-Options.html
Ответ №1:
Ваш код выглядит нормально. Комментарии выше в вашем вопросе о компиляции с включенной оптимизацией выпуска, вероятно, верны, и это все, что вам нужно сделать.
Отказ от ответственности: Всегда сначала измеряйте код с помощью соответствующих инструментов, чтобы определить, где находятся узкие места, прежде чем пытаться улучшить его производительность. В противном случае вы можете не получить желаемых улучшений.
Но я вижу пару потенциальных микрооптимизаций, которые заключаются в следующем.
Измените это в своей thread_proc
функции
while (true)
{
std::unique_lock<std::mutex> latch(queue_mutex);
cv_task.wait(latch, [this](){ return stop || !tasks.empty(); });
if (!tasks.empty())
К этому:
std::unique_lock<std::mutex> latch(queue_mutex);
while (!stop)
{
cv_task.wait(latch, [this](){ return stop || !tasks.empty(); });
while (!tasks.empty() amp;amp; !stop)
А затем удалите else if (stop)
блок и конец функции.
Основное влияние этого заключается в том, что это позволяет избежать дополнительных «разблокировок» и «блокировок» queue_mutex
в результате latch
выхода за пределы области видимости на каждой итерации while
цикла. Изменение if (!tasks.empty())
на while (!tasks.empty())
также может сэкономить один или два цикла, позволяя текущему выполняемому потоку, у которого есть квант, сохранить блокировку и попытаться отменить следующий рабочий элемент.
<мнение> И последнее. Я всегда придерживаюсь мнения, что notify
должно быть вне блокировки. Таким образом, не возникает конфликта блокировок, когда другой поток запускается потоком, который только что обновил очередь. Но я никогда на самом деле не измерял это предположение, так что отнеситесь к нему с долей скептицизма:
template<class F> void enqueue(Famp;amp; f)
{
queue_mutex.lock();
tasks.emplace_back(std::forward<F>(f));
queue_mutex.unlock();
cv_task.notify_one();
}