#c #multithreading #c 20 #condition-variable #c -coroutine
Вопрос:
Мой код, похоже, работает без каких-либо проблем с GCC 10.2 в Archlinux. Я использовал valgrind на нескольких итерациях и не обнаружил никаких проблем с памятью.
Я создал несколько помощников по мьютексам и переменным условий, чтобы упростить свой код. Вот мой код :
#include <atomic>
#include <condition_variable>
#include <coroutine>
#include <deque>
#include <iostream>
#include <optional>
#include <shared_mutex>
#include <string_view>
#include <thread>
#include <utility>
#include <vector>
#define FWD(x) std::forward<decltype(x)>(x)
template <typename... Ts> void display(Ts... ts) {
static std::mutex m;
m.lock();
std::cout << "this_thread: " << std::this_thread::get_id() << ": ";
((std::cout << ts << " "), ...);
std::cout << std::endl;
m.unlock();
}
template <typename T, typename Lock> struct LockedValue {
LockedValue(T amp;value, Lock amp;amp;lock)
: m_value{value}, m_lock{std::move(lock)} {}
T amp;operator*() { return m_value; }
T *operator->() { return amp;m_value; }
void lock() { m_lock.lock(); }
void unlock() { m_lock.unlock(); }
private:
T amp;m_value;
Lock m_lock;
};
template <typename T> class Mutex {
public:
template <typename... Args> Mutex(Args amp;amp;...args) : m_value{FWD(args)...} {}
auto lock() { return LockedValue{m_value, std::unique_lock{m_mutex}}; }
auto lock() const { return LockedValue{m_value, std::shared_lock{m_mutex}}; }
private:
friend class ConditionVariable;
T m_value;
mutable std::shared_mutex m_mutex;
};
class ConditionVariable {
public:
void notifyOne() { m_cond.notify_one(); }
void notifyAll() { m_cond.notify_all(); }
template <typename F, typename Args>
void wait(F f, const Mutex<Args> amp;mutexes) {
auto lock = mutexes.lock();
m_cond.wait(lock, [amp;] { return f(*lock); });
}
template <typename F, typename Args>
void wait(F f, std::stop_token st, const Mutex<Args> amp;mutexes) {
auto lock = mutexes.lock();
m_cond.wait(lock, st, [amp;] { return f(*lock); });
}
private:
std::condition_variable_any m_cond;
};
struct Awaiter {
public:
Awaiter() {}
template <typename... Args>
Awaiter(std::coroutine_handle<Args...> handle) : m_handle{handle} {}
Awaiter(const Awaiter amp;) = delete;
Awaiter(Awaiter amp;amp;a) : m_handle{a.m_handle} { a.m_handle = nullptr; }
void resume() { m_handle(); }
private:
std::coroutine_handle<> m_handle = nullptr;
};
class Thread {
struct Awaitable {
Thread amp;thread;
bool await_ready() { return false; }
void await_suspend(std::coroutine_handle<> handle) {
thread.addAwaiter({handle});
}
void await_resume() {}
};
public:
Thread(std::string name) : m_name{std::move(name)} {
m_thread = std::jthread([this](std::stop_token st) { run(st); });
}
void addAwaiter(Awaiter amp;amp;awaiter) {
m_awaiters.lock()->push_back(std::move(awaiter));
m_conditionVariable.notifyOne();
}
auto id() { return m_thread.get_id(); }
Awaitable operator co_await() { return {*this}; }
private:
void run(std::stop_token st) {
while (!st.stop_requested()) {
m_conditionVariable.wait(amp;Thread::hasAwaiters, st, m_awaiters);
std::optional<Awaiter> awaiter;
{
auto awaiters = m_awaiters.lock();
if (!awaiters->empty()) {
awaiter.emplace(std::move(awaiters->front()));
awaiters->pop_front();
}
}
if (awaiter)
awaiter->resume();
}
}
static bool hasAwaiters(const std::deque<Awaiter> amp;awaiters) {
return !awaiters.empty();
}
private:
std::string m_name;
Mutex<std::deque<Awaiter>> m_awaiters;
ConditionVariable m_conditionVariable;
std::jthread m_thread;
};
struct task {
struct promise_type {
task get_return_object() { return {}; }
std::suspend_never initial_suspend() noexcept { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() noexcept {}
~promise_type() {}
};
~task() {}
};
std::atomic_int x = 0;
std::atomic_int done = 0;
task f(Thread amp;thread1, Thread amp;thread2) {
co_await thread1;
x;
co_await thread2;
x;
done;
}
using namespace std::chrono_literals;
static auto N_ITER = 1;
static auto loop = 10'000'000;
int main() {
std::ios_base::sync_with_stdio(false);
display("Single task");
for (int i = 0; i < loop; i) {
Thread thread1{"A"};
Thread thread2{"B"};
x = 0;
done = 0;
for (int j = 0; j < N_ITER; j)
f(thread1, thread2);
int j = 0;
while (done < N_ITER)
;
if (x != N_ITER * 2) {
std::cout << "error at ith" << i << std::endl;
}
if (i % 1'000'000 == 0)
std::cout << i << std::endl;
}
N_ITER *= 500;
display("Middle of task");
for (int i = 0; i < loop; i) {
Thread thread1{"A"};
Thread thread2{"B"};
x = 0;
done = 0;
for (int j = 0; j < N_ITER; j)
f(thread1, thread2);
int j = 0;
while (done < N_ITER)
;
if (x != N_ITER * 2) {
std::cout << "error at ith" << i << std::endl;
}
if (i % 1'000'000 == 0)
std::cout << i << std::endl;
}
// Lot of task
std::cout << "Now lot of task" << std::endl;
loop /= 1000;
N_ITER *= 1000;
for (int i = 0; i < loop; i) {
Thread thread1{"A"};
Thread thread2{"B"};
x = 0;
done = 0;
for (int j = 0; j < N_ITER; j)
f(thread1, thread2);
int j = 0;
while (done < N_ITER)
;
if (x != N_ITER * 2) {
std::cout << "error at ith" << i << std::endl;
}
if (i % 1000 == 0)
std::cout << i << std::endl;
}
return 0;
}
Вот идея кода :
Я создаю 2 потока, запускаю сопрограмму, выполняю первую часть внутри первого потока, выполняю вторую часть во втором потоке.
В основном потоке я жду конца, удаляю потоки и повторяю это снова.
Как я уже сказал, в linux gcc у меня нет никаких проблем, однако в MSVC я сталкиваюсь с некоторыми странными проблемами. Когда я запускаю тест всю ночь, у меня нет никакого сбоя. Однако иногда, когда я что-то делаю на своем компьютере, например, читаю документацию по microsoft edge, происходит сбой «тестового кода», а иногда это связано с «уничтожением мьютекса во время работы».
Поэтому я не знаю, является ли это ошибкой в моем коде или это ошибка в другом месте (возможно, MSVC). Естественно, я думаю, что проблема в моем коде, но на самом деле я не вижу, где я допустил ошибку…