#c #multithreading #concurrency
#c #многопоточность #параллелизм
Вопрос:
Рассмотрим следующую программу, которая реализует одновременную очередь с ОДНИМ потребителем и НЕСКОЛЬКИМИ производителями.
Работает нормально в контексте 1 потребителя, 1 производителя.
Однако установка второго потребителя (без комментариев к строкам ниже) приводит к утечкам памяти, и я не могу понять, почему…
Использование T=std::shared_ptr очереди и изменение pop для возврата shared_ptr устраняет утечку памяти, так чего же мне не хватает в коде, как показано ниже?
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
template<typename T>
class Queue {
private:
static constexpr unsigned mSize = 256; //power of two only
static constexpr unsigned mRoundRobinMask = mSize - 1;
static const T mEmpty;
T mData[mSize];
std::mutex mtx;
unsigned mReadP = 0;
unsigned mWriteP = 0;
public:
const T pop() {
if (!peek()) {
return mEmpty; // copy
}
std::lock_guard<std::mutex> lock(mtx);
Tamp; ret = mData[mReadP amp; mRoundRobinMask]; // get a ref
mReadP ;
return ret; // copy of ref
}
void push(const Tamp; aItemRef) {
start:
if (!wait()) {
throw std::runtime_error("!Queue FULL!");
}
std::lock_guard<std::mutex> lock(mtx);
if(size() == mSize) {
goto start;
}
mData[mWriteP amp; mRoundRobinMask] = aItemRef;
mWriteP ;
}
bool peek() const {
return mWriteP != mReadP;
}
unsigned size() const {
return mWriteP > mReadP ? mWriteP - mReadP : mReadP - mWriteP; // mod (Read-Write)
}
bool wait() {
unsigned it = 0;
while (size() == mSize) {
if (it > 1000000) { return false; }
}
return true;
}
};
template<typename T>
const T Queue<T>::mEmpty = T{ };
int main(int, char**) {
using Method = std::function<void()>;
Queue<Method*> queue;
std::thread consumer([ amp; ] {
while (true) {
if (queue.peek()) {
auto task = queue.pop();
(*task)();
delete task;
}
}
});
std::thread producer1([ amp; ] {
unsigned index = 0;
while (true) {
auto id = index ;
auto task = new Method([ = ] {
std::cout << "Running task " << id << std::endl;
});
queue.push(task);
}
});
// std::thread producer2 ([ amp; ] {
// unsigned index = 0;
// while (true) {
// auto id = index ;
// auto task = new Method([ = ] {
// std::cout << "Running task " << id<< std::endl;
// });
// queue.push(task);
// }
// });
consumer.join();
producer1.join();
// producer2.join();
return 0;
}
Предложенное редактирование метода push @1201ProgramAlarm
void push(const Tamp; aItemRef) {
start:
if (!wait()) {
throw std::runtime_error("!Queue FULL!");
}
std::lock_guard<std::mutex> lock(mtx);
if(getCount() == mSize) {
goto start;
}
mData[mWriteP amp; mRoundRobinMask] = aItemRef;
mWriteP ;
}
Выполняет ли задание, больше никаких утечек, но это GOTO : ( 🙁 …. есть идеи о том, как избежать использования goto?
Комментарии:
1. Кто-то проголосовал против и проголосовал за закрытие, потому что ему нужны детали отладки. Вы отлаживали это? P.s. shared_ptr не является потокобезопасным, но это другая проблема
2. Запуск его с 1 потребителем, 1 производителем не имеет утечек памяти и работает нормально в течение нескольких часов, не ломается. Как мне отлаживать? Он прерывается только с 2 производителями, и когда у меня заканчивается оперативная память
Ответ №1:
В вашем примере есть ряд проблем.
Главное, что это не потокобезопасно: оба push()
и pop()
изменяют неатомные переменные-члены mReadP
и mWriteP
не защищены a mutex
.
Вторая, менее важная проблема заключается в том, что ожидание прибытия элементов для отправки или освобождения места для отправки обычно выполняется с помощью condition_variables , которые приостанавливают поток до тех пор, пока не будет достигнуто условие.
Пожалуйста, попробуйте приведенную ниже версию, поскольку я обновил ее с этими изменениями.
Я также добавил условие завершения, чтобы показать, как безопасно выводить все потоки, и замедлил все это, чтобы показать, что происходит.
#include <array>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
#include <optional>
template<typename T>
class Queue {
private:
static constexpr unsigned mSize = 256; //power of two only
static constexpr unsigned mRoundRobinMask = mSize - 1;
std::array<T, mSize> mData;
std::mutex mtx;
unsigned mReadP = 0;
unsigned mWriteP = 0;
std::condition_variable notFull;
std::condition_variable notEmpty;
bool stopped = false;
public:
const std::optional<T> pop() {
// Always grab the mutex before accessing any shared members
std::unique_lock<std::mutex> lock(mtx);
// Wait until there is an item in the queue.
notEmpty.wait(lock, [amp;] {return stopped || mWriteP != mReadP; });
if(stopped)
return std::nullopt;
Tamp; ret = mData[mReadP amp; mRoundRobinMask]; // get a ref
mReadP ;
// Wake any threads waiting on full buffer
notFull.notify_one();
return ret; // copy of ref
}
void push(const Tamp; pItem) {
std::unique_lock<std::mutex> lock(mtx);
// Wait until there is space to put at least one item
notFull.wait(lock, [amp;] { return stopped || getCount() < mSize; });
if(stopped)
return;
mData[mWriteP amp; mRoundRobinMask] = pItem;
mWriteP ;
// Wake any threads waiting on empty buffer
notEmpty.notify_one();
}
unsigned getCount() const {
return mWriteP > mReadP ?
mWriteP - mReadP : mReadP - mWriteP; // mod (Read-Write)
}
void stop() {
// Signal the stop condition
stopped = true;
// Grabbing the lock before notifying is essential to make sure
// any worker threads waiting on the condition_variables.
std::unique_lock<std::mutex> lock(mtx);
// Wake all waiting threads
notFull.notify_all();
notEmpty.notify_all();
}
};
int main(int, char**) {
using Method = std::function<void()>;
Queue<Method> queue;
bool running = true;
std::thread consumer([ amp; ] {
while (running) {
auto task = queue.pop();
if(task) {
// If there was a task, execute it.
(*task)();
} else {
// No task means we are done.
return;
}
}
});
std::thread producer1([ amp; ] {
unsigned index = 0;
while (running) {
auto id = index ;
queue.push([ = ] {
std::cout << "Running task " << id << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
});
}
});
std::this_thread::sleep_for(std::chrono::seconds(2));
// Use pre-c -20 mechanisms to signal the worker threads to stop their loops
running = false;
// If they're in the queue stop that too.
queue.stop();
consumer.join();
producer1.join();
return EXIT_SUCCESS;
}
Обратите внимание, что если вы можете использовать C 20
, вы должны это сделать, поскольку у него есть std::jthread, который имеет более элегантные механизмы, такие как автоматическое присоединение и condition_variable::wait()
завершение потоков через std::jthread::request_stop() .
Ответ №2:
Ваш push
не является потокобезопасным.
При вызове двумя потоками, когда в очереди доступен только один слот, оба потока могут проходить wait
, что приводит к возможности того, что один из потоков перезапишет существующий элемент в очереди. Этот перезаписанный элемент не будет освобожден, что приведет к утечке памяти.
Решением является проверка, заполнена ли очередь снова после получения блокировки. Если это так, вам нужно освободить блокировку и снова подождать, пока не станет доступен слот.
Кроме того, wait
функцию можно сделать более удобной, включив sleep(0)
вызов в цикл while. Это позволит снизить энергопотребление и использование ресурсов процессора во время ожидания.