записи в очереди пула потоков в C

#c #multithreading #c 11 #threadpool

#c #многопоточность #c 11 #пул потоков

Вопрос:

Я пытаюсь реализовать threadpool, и я обнаружил, что существует две реализации. Все они используют очередь для управления задачами, но записи в очереди разные. Я хочу понять, какая из реализаций лучше и рациональнее. Зачем нужна вторая версия function_wrapper .

В первой версии используется std::queue<std::packaged_task<void()> > tasks;

 class ThreadPool {
 public:
  explicit ThreadPool(size_t);
  template <class F, class... Args>
  decltype(auto) enqueue(Famp;amp; f, Argsamp;amp;... args);
  ~ThreadPool();

 private:
  // need to keep track of threads so we can join them
  std::vector<std::thread> workers;
  // the task queue
  std::queue<std::packaged_task<void()> > tasks;

  // synchronization
  std::mutex queue_mutex;
  std::condition_variable condition;
  bool stop;
};

// add new work item to the pool
template <class F, class... Args>
decltype(auto) ThreadPool::enqueue(Famp;amp; f, Argsamp;amp;... args) {
  using return_type = decltype(f(args...));

  std::packaged_task<return_type()> task(
      std::bind(std::forward<F>(f), std::forward<Args>(args)...));

  std::future<return_type> res = task.get_future();
  {
    std::unique_lock<std::mutex> lock(queue_mutex);

    // don't allow enqueueing after stopping the pool
    if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");

    tasks.emplace(std::move(task));
  }
  condition.notify_one();
  return res;
}
 

Вторая версия использует thread_safe_queue<function_wrapper> work_queue;

 class function_wrapper {
  struct impl_base {
    virtual void call() = 0;
    virtual ~impl_base() {}
  };
  std::unique_ptr<impl_base> impl;
  template <typename F>
  struct impl_type : impl_base {
    F f;
    impl_type(Famp;amp; f_) : f(std::move(f_)) {}
    void call() { f(); }
  };

 public:
  template <typename F>
  function_wrapper(Famp;amp; f) : impl(new impl_type<F>(std::move(f))) {}
  void operator()() { impl->call(); }
  function_wrapper() = default;
  function_wrapper(function_wrapperamp;amp; other) : impl(std::move(other.impl)) {}
  function_wrapperamp; operator=(function_wrapperamp;amp; other) {
    impl = std::move(other.impl);
    return *this;
  }
  function_wrapper(const function_wrapperamp;) = delete;
  function_wrapper(function_wrapperamp;) = delete;
  function_wrapperamp; operator=(const function_wrapperamp;) = delete;
};

class thread_pool {
  thread_safe_queue<function_wrapper> work_queue;
  void worker_thread() {
    while (!done) {
      function_wrapper task;
      if (work_queue.try_pop(task)) {
        task();
      } else {
        std::this_thread::yield();
      }
    }
  }

 public:
  template <typename FunctionType>
  std::future<typename std::result_of<FunctionType()>::type> submit(
      FunctionType f) {
    typedef typename std::result_of<FunctionType()>::type result_type;
    std::packaged_task<result_type()> task(std::move(f));
    std::future<result_type> res(task.get_future());
    work_queue.push(std::move(task));
    return res;
  }
}
 

Комментарии:

1. Для обсуждения плюсов и минусов различных (допустимых) подходов вам может повезти больше в CodeReview stackexchange. Кроме того, ваши собственные варианты использования будут очень важны для того, какое «лучшее» решение для вас. Например, чем меньше синхронизации вам сойдет с рук, тем лучше. Или, возможно, вам нужно выполнить стирание типов, а может быть, и нет.

2. class function_wrapper в основном реализует std::function<void()> . Для этого нет причин, поэтому первая версия лучше (меньше кода = меньше ошибок).

3. @rustyx но почему std::queue<std::packaged_task<void()> > tasks; можно использовать функции любого типа

4. @ywen232625 потому std::packaged_task что уже завершает задачу, как std::function это делает. » Шаблон класса std::packaged_task переносит любую Callable цель (функцию, лямбда-выражение, выражение привязки или другой функциональный объект), чтобы его можно было вызывать асинхронно. «.