Утечка памяти параллельной очереди C

#c #multithreading #concurrency

#c #многопоточность #параллелизм

Вопрос:

Рассмотрим следующую программу, которая реализует одновременную очередь с ОДНИМ потребителем и НЕСКОЛЬКИМИ производителями.

Работает нормально в контексте 1 потребителя, 1 производителя.

Однако установка второго потребителя (без комментариев к строкам ниже) приводит к утечкам памяти, и я не могу понять, почему…

Использование T=std::shared_ptr очереди и изменение pop для возврата shared_ptr устраняет утечку памяти, так чего же мне не хватает в коде, как показано ниже?

 #include <functional>
#include <iostream>
#include <mutex>
#include <thread>

template<typename T>
class Queue {
private:
    static constexpr unsigned mSize = 256;  //power of two only
    static constexpr unsigned mRoundRobinMask = mSize - 1;

    static const T mEmpty;

    T mData[mSize];
    std::mutex mtx;
    unsigned mReadP = 0;
    unsigned mWriteP = 0;

public:
    const T pop() {    
        if (!peek()) {
            return mEmpty; // copy
        }

        std::lock_guard<std::mutex> lock(mtx);   

        Tamp; ret = mData[mReadP amp; mRoundRobinMask]; // get a ref

        mReadP  ;
        return ret; // copy of ref
    }

    void push(const Tamp; aItemRef) {
        start:
        if (!wait()) {
            throw std::runtime_error("!Queue FULL!");
        }

        std::lock_guard<std::mutex> lock(mtx);

        if(size() == mSize) {
            goto start;
        }

        mData[mWriteP amp; mRoundRobinMask] = aItemRef;
        mWriteP  ;
    }

    bool peek() const {
        return mWriteP != mReadP;
    }

    unsigned size() const {
        return mWriteP > mReadP ? mWriteP - mReadP : mReadP - mWriteP; // mod (Read-Write)
    }

    bool wait() {
        unsigned it = 0;
        while (size() == mSize) {
            if (it   > 1000000) { return false; }
        }

        return true;
    }
};

template<typename T>
const T Queue<T>::mEmpty = T{ };

int main(int, char**) {
    using Method = std::function<void()>;

    Queue<Method*> queue;

    std::thread consumer([ amp; ] {
        while (true) {
            if (queue.peek()) {
                auto task = queue.pop();
                (*task)();
                delete task;
            }
        }
    });

    std::thread producer1([ amp; ] {
        unsigned index = 0;
        while (true) {
            auto id = index  ;
            auto task = new Method([ = ] {
                std::cout << "Running task " << id << std::endl;
            });
            queue.push(task);
        }
    });

    // std::thread producer2 ([ amp; ] {
    //     unsigned index = 0;
    //     while (true) {
    //         auto id = index  ;
    //         auto task = new Method([ = ] {
    //             std::cout << "Running task " << id<< std::endl;
    //         });
    //         queue.push(task);
    //     }
    // });

    consumer.join();
    producer1.join();
    // producer2.join();
    return 0;
}
  

Предложенное редактирование метода push @1201ProgramAlarm

     void push(const Tamp; aItemRef) {
        start:
        if (!wait()) {
            throw std::runtime_error("!Queue FULL!");
        }

        std::lock_guard<std::mutex> lock(mtx);

        if(getCount() == mSize) {
            goto start;
        }

        mData[mWriteP amp; mRoundRobinMask] = aItemRef;
        mWriteP  ;
    }
  

Выполняет ли задание, больше никаких утечек, но это GOTO : ( 🙁 …. есть идеи о том, как избежать использования goto?

Комментарии:

1. Кто-то проголосовал против и проголосовал за закрытие, потому что ему нужны детали отладки. Вы отлаживали это? P.s. shared_ptr не является потокобезопасным, но это другая проблема

2. Запуск его с 1 потребителем, 1 производителем не имеет утечек памяти и работает нормально в течение нескольких часов, не ломается. Как мне отлаживать? Он прерывается только с 2 производителями, и когда у меня заканчивается оперативная память

Ответ №1:

В вашем примере есть ряд проблем.

Главное, что это не потокобезопасно: оба push() и pop() изменяют неатомные переменные-члены mReadP и mWriteP не защищены a mutex .

Вторая, менее важная проблема заключается в том, что ожидание прибытия элементов для отправки или освобождения места для отправки обычно выполняется с помощью condition_variables , которые приостанавливают поток до тех пор, пока не будет достигнуто условие.

Пожалуйста, попробуйте приведенную ниже версию, поскольку я обновил ее с этими изменениями.

Я также добавил условие завершения, чтобы показать, как безопасно выводить все потоки, и замедлил все это, чтобы показать, что происходит.

 #include <array>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
#include <optional>

template<typename T>
class Queue {
private:
    static constexpr unsigned mSize = 256;  //power of two only
    static constexpr unsigned mRoundRobinMask = mSize - 1;

    std::array<T, mSize> mData;
    std::mutex mtx;
    unsigned mReadP = 0;
    unsigned mWriteP = 0;

    std::condition_variable notFull;
    std::condition_variable notEmpty;

    bool stopped = false;

public:
    const std::optional<T> pop() {
        // Always grab the mutex before accessing any shared members
        std::unique_lock<std::mutex> lock(mtx);

        // Wait until there is an item in the queue.
        notEmpty.wait(lock, [amp;] {return stopped || mWriteP != mReadP; });
        if(stopped)
            return std::nullopt;

        Tamp; ret = mData[mReadP amp; mRoundRobinMask]; // get a ref
        mReadP  ;

        // Wake any threads waiting on full buffer
        notFull.notify_one();
        return ret; // copy of ref
    }

    void push(const Tamp; pItem) {
        std::unique_lock<std::mutex> lock(mtx);

        // Wait until there is space to put at least one item
        notFull.wait(lock, [amp;] { return stopped || getCount() < mSize; });
        if(stopped)
            return;

        mData[mWriteP amp; mRoundRobinMask] = pItem;
        mWriteP  ;

        // Wake any threads waiting on empty buffer
        notEmpty.notify_one();
    }

    unsigned getCount() const {
        return mWriteP > mReadP ?
            mWriteP - mReadP : mReadP - mWriteP; // mod (Read-Write)
    }

    void stop() {
        // Signal the stop condition
        stopped = true;

        // Grabbing the lock before notifying is essential to make sure
        // any worker threads waiting on the condition_variables.
        std::unique_lock<std::mutex> lock(mtx);

        // Wake all waiting threads
        notFull.notify_all();
        notEmpty.notify_all();
    }
};

int main(int, char**) {
    using Method = std::function<void()>;

    Queue<Method> queue;
    bool running = true;

    std::thread consumer([ amp; ] {
        while (running) {
            auto task = queue.pop();
            if(task) {
                // If there was a task, execute it.
                (*task)();
            } else {
                // No task means we are done.
                return;
            }
        }
    });

    std::thread producer1([ amp; ] {
        unsigned index = 0;
        while (running) {
            auto id = index  ;
            queue.push([ = ] {
                std::cout << "Running task " << id << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(100));
            });
        }
    });

    std::this_thread::sleep_for(std::chrono::seconds(2));

    // Use pre-c  -20 mechanisms to signal the worker threads to stop their loops
    running = false;
    // If they're in the queue stop that too.
    queue.stop();

    consumer.join();
    producer1.join();
    return EXIT_SUCCESS;
}
  

Обратите внимание, что если вы можете использовать C 20 , вы должны это сделать, поскольку у него есть std::jthread, который имеет более элегантные механизмы, такие как автоматическое присоединение и condition_variable::wait() завершение потоков через std::jthread::request_stop() .

Ответ №2:

Ваш push не является потокобезопасным.

При вызове двумя потоками, когда в очереди доступен только один слот, оба потока могут проходить wait , что приводит к возможности того, что один из потоков перезапишет существующий элемент в очереди. Этот перезаписанный элемент не будет освобожден, что приведет к утечке памяти.

Решением является проверка, заполнена ли очередь снова после получения блокировки. Если это так, вам нужно освободить блокировку и снова подождать, пока не станет доступен слот.

Кроме того, wait функцию можно сделать более удобной, включив sleep(0) вызов в цикл while. Это позволит снизить энергопотребление и использование ресурсов процессора во время ожидания.