Сопрограммы, std::jthread, MSVC и сбой

#c #multithreading #c 20 #condition-variable #c -coroutine

Вопрос:

Мой код, похоже, работает без каких-либо проблем с GCC 10.2 в Archlinux. Я использовал valgrind на нескольких итерациях и не обнаружил никаких проблем с памятью.

Я создал несколько помощников по мьютексам и переменным условий, чтобы упростить свой код. Вот мой код :

 #include <atomic>
#include <condition_variable>
#include <coroutine>
#include <deque>
#include <iostream>
#include <optional>
#include <shared_mutex>
#include <string_view>
#include <thread>
#include <utility>
#include <vector>

#define FWD(x) std::forward<decltype(x)>(x)

template <typename... Ts> void display(Ts... ts) {
  static std::mutex m;
  m.lock();
  std::cout << "this_thread: " << std::this_thread::get_id() << ": ";
  ((std::cout << ts << " "), ...);
  std::cout << std::endl;
  m.unlock();
}

template <typename T, typename Lock> struct LockedValue {
  LockedValue(T amp;value, Lock amp;amp;lock)
      : m_value{value}, m_lock{std::move(lock)} {}
  T amp;operator*() { return m_value; }
  T *operator->() { return amp;m_value; }

  void lock() { m_lock.lock(); }
  void unlock() { m_lock.unlock(); }

private:
  T amp;m_value;
  Lock m_lock;
};

template <typename T> class Mutex {
public:
  template <typename... Args> Mutex(Args amp;amp;...args) : m_value{FWD(args)...} {}

  auto lock() { return LockedValue{m_value, std::unique_lock{m_mutex}}; }
  auto lock() const { return LockedValue{m_value, std::shared_lock{m_mutex}}; }

private:
  friend class ConditionVariable;
  T m_value;
  mutable std::shared_mutex m_mutex;
};

class ConditionVariable {
public:
  void notifyOne() { m_cond.notify_one(); }
  void notifyAll() { m_cond.notify_all(); }

  template <typename F, typename Args>
  void wait(F f, const Mutex<Args> amp;mutexes) {
    auto lock = mutexes.lock();
    m_cond.wait(lock, [amp;] { return f(*lock); });
  }

  template <typename F, typename Args>
  void wait(F f, std::stop_token st, const Mutex<Args> amp;mutexes) {
    auto lock = mutexes.lock();
    m_cond.wait(lock, st, [amp;] { return f(*lock); });
  }

private:
  std::condition_variable_any m_cond;
};

struct Awaiter {
public:
  Awaiter() {}

  template <typename... Args>
  Awaiter(std::coroutine_handle<Args...> handle) : m_handle{handle} {}

  Awaiter(const Awaiter amp;) = delete;
  Awaiter(Awaiter amp;amp;a) : m_handle{a.m_handle} { a.m_handle = nullptr; }

  void resume() { m_handle(); }

private:
  std::coroutine_handle<> m_handle = nullptr;
};

class Thread {
  struct Awaitable {
    Thread amp;thread;
    bool await_ready() { return false; }

    void await_suspend(std::coroutine_handle<> handle) {
      thread.addAwaiter({handle});
    }

    void await_resume() {}
  };

public:
  Thread(std::string name) : m_name{std::move(name)} {
    m_thread = std::jthread([this](std::stop_token st) { run(st); });
  }

  void addAwaiter(Awaiter amp;amp;awaiter) {
    m_awaiters.lock()->push_back(std::move(awaiter));
    m_conditionVariable.notifyOne();
  }

  auto id() { return m_thread.get_id(); }

  Awaitable operator co_await() { return {*this}; }

private:
  void run(std::stop_token st) {
    while (!st.stop_requested()) {
      m_conditionVariable.wait(amp;Thread::hasAwaiters, st, m_awaiters);

      std::optional<Awaiter> awaiter;
      {
        auto awaiters = m_awaiters.lock();
        if (!awaiters->empty()) {
          awaiter.emplace(std::move(awaiters->front()));
          awaiters->pop_front();
        }
      }

      if (awaiter)
        awaiter->resume();
    }
  }

  static bool hasAwaiters(const std::deque<Awaiter> amp;awaiters) {
    return !awaiters.empty();
  }

private:
  std::string m_name;
  Mutex<std::deque<Awaiter>> m_awaiters;

  ConditionVariable m_conditionVariable;
  std::jthread m_thread;
};

struct task {
  struct promise_type {
    task get_return_object() { return {}; }
    std::suspend_never initial_suspend() noexcept { return {}; }
    std::suspend_never final_suspend() noexcept { return {}; }
    void return_void() {}
    void unhandled_exception() noexcept {}

    ~promise_type() {}
  };

  ~task() {}
};

std::atomic_int x = 0;

std::atomic_int done = 0;

task f(Thread amp;thread1, Thread amp;thread2) {
  co_await thread1;
    x;

  co_await thread2;
    x;
    done;
}

using namespace std::chrono_literals;

static auto N_ITER = 1;
static auto loop = 10'000'000;
int main() {
  std::ios_base::sync_with_stdio(false);
  display("Single task");
  for (int i = 0; i < loop;   i) {
    Thread thread1{"A"};
    Thread thread2{"B"};
    x = 0;
    done = 0;

    for (int j = 0; j < N_ITER;   j)
      f(thread1, thread2);

    int j = 0;
    while (done < N_ITER)
      ;

    if (x != N_ITER * 2) {
      std::cout << "error at ith" << i << std::endl;
    }
    if (i % 1'000'000 == 0)
      std::cout << i << std::endl;
  }

  N_ITER *= 500;
  display("Middle of task");
  for (int i = 0; i < loop;   i) {
    Thread thread1{"A"};
    Thread thread2{"B"};
    x = 0;
    done = 0;

    for (int j = 0; j < N_ITER;   j)
      f(thread1, thread2);

    int j = 0;
    while (done < N_ITER)
      ;

    if (x != N_ITER * 2) {
      std::cout << "error at ith" << i << std::endl;
    }
    if (i % 1'000'000 == 0)
      std::cout << i << std::endl;
  }

  // Lot of task
  std::cout << "Now lot of task" << std::endl;
  loop /= 1000;
  N_ITER *= 1000;
  for (int i = 0; i < loop;   i) {
    Thread thread1{"A"};
    Thread thread2{"B"};
    x = 0;
    done = 0;

    for (int j = 0; j < N_ITER;   j)
      f(thread1, thread2);

    int j = 0;
    while (done < N_ITER)
      ;

    if (x != N_ITER * 2) {
      std::cout << "error at ith" << i << std::endl;
    }
    if (i % 1000 == 0)
      std::cout << i << std::endl;
  }

  return 0;
}
 

Вот идея кода :
Я создаю 2 потока, запускаю сопрограмму, выполняю первую часть внутри первого потока, выполняю вторую часть во втором потоке.

В основном потоке я жду конца, удаляю потоки и повторяю это снова.

Как я уже сказал, в linux gcc у меня нет никаких проблем, однако в MSVC я сталкиваюсь с некоторыми странными проблемами. Когда я запускаю тест всю ночь, у меня нет никакого сбоя. Однако иногда, когда я что-то делаю на своем компьютере, например, читаю документацию по microsoft edge, происходит сбой «тестового кода», а иногда это связано с «уничтожением мьютекса во время работы».

Поэтому я не знаю, является ли это ошибкой в моем коде или это ошибка в другом месте (возможно, MSVC). Естественно, я думаю, что проблема в моем коде, но на самом деле я не вижу, где я допустил ошибку…