C 11 неблокирующих, длительно работающих потоков-потребителей-производителей

#multithreading #c 11

#многопоточность #c 11

Вопрос:

Я пытаюсь научить себя потоковой обработке C 11, и я хотел бы запустить фоновый поток производителя в начале приложения и запустить его до выхода из приложения. Я также хотел бы иметь поток-потребитель (который также выполняется в течение всего срока службы приложения).

Реальным примером может служить поток-производитель, прослушивающий Com-порт для входящих данных GPS. После накопления полного сообщения его можно проанализировать, чтобы определить, представляет ли оно интерес, затем преобразовать в строку (скажем) и «доставить обратно» для использования (например, обновить текущее местоположение).).

Моя проблема в том, что я не смог понять, как это сделать, не блокируя остальную часть приложения, когда я ‘join ()’ в потоке потребителя.

Вот мой очень упрощенный пример, который, надеюсь, показывает мои проблемы:

 #include <QCoreApplication>
#include <QDebug>
#include <thread>
#include <atomic>
#include <iostream>
#include <queue>
#include <mutex>
#include <chrono>
#include "threadsafequeuetwo.h"

ThreadSafeQueueTwo<int> goods;
std::mutex mainMutex;

std::atomic<bool> isApplicationRunning = false;

void theProducer ()
{
    std::atomic<int> itr = 0;
    while(isApplicationRunning)
    {
        // Simulate this taking some time...
        std::this_thread::sleep_for(std::chrono::milliseconds(60));

        // Push the "produced" value onto the queue...
        goods.push(  itr);

        // Diagnostic printout only...
        if ((itr % 10) == 0)
        {
            std::unique_lock<std::mutex> lock(mainMutex);
            std::cout << "PUSH " << itr << " on thread ID: "
                  << std::this_thread::get_id() << std::endl;
        }

        // Thread ending logic.
        if (itr > 100) isApplicationRunning = false;
    }
}

void theConsumer ()
{
    while(isApplicationRunning || !goods.empty())
    {
        int val;

        // Wait on new values, and 'pop' when available...
        goods.waitAndPop(val);

        // Here, we would 'do something' with the new values...
        // Simulate this taking some time...
        std::this_thread::sleep_for(std::chrono::milliseconds(10));

        // Diagnostic printout only...
        if ((val % 10) == 0)
        {
            std::unique_lock<std::mutex> lock(mainMutex);
            std::cout << "POP " << val << " on thread ID: "
                  << std::this_thread::get_id() << std::endl;
        }
    }
}

int main(int argc, char *argv[])
{
    std::cout << "MAIN running on thread ID: "
              << std::this_thread::get_id() << std::endl;

    // This varaiable gets set to true at startup, and,
    // would only get set to false when the application
    // wants to exit.
    isApplicationRunning = true;

    std::thread producerThread (theProducer);
    std::thread consumerThread (theConsumer);

    producerThread.detach();
    consumerThread.join();      // BLOCKS!!! - how to get around this???

    std::cout << "MAIN ending on thread ID: "
              << std::this_thread::get_id() << std::endl;
}
  

Класс ThreadSafeQueueTwo — это потокобезопасная реализация очереди, взятая почти в точности из книги «Параллелизм C в действии». Кажется, это работает просто отлично. Вот это, если кому-то интересно:

 #ifndef THREADSAFEQUEUETWO_H
#define THREADSAFEQUEUETWO_H

#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>

template<typename T>
class ThreadSafeQueueTwo
{
public:
    ThreadSafeQueueTwo()
    {}

    ThreadSafeQueueTwo(ThreadSafeQueueTwo constamp; rhs)
    {
        std::lock_guard<std::mutex> lock(myMutex);
        myQueue = rhs.myQueue;
    }

    void push(T newValue)
    {
        std::lock_guard<std::mutex> lock(myMutex);
        myQueue.push(newValue);
        myCondVar.notify_one();
    }

    void waitAndPop(Tamp; value)
    {
        std::unique_lock<std::mutex> lock(myMutex);
        myCondVar.wait(lock, [this]{return !myQueue.empty(); });
        value = myQueue.front();
        myQueue.pop();
    }

    std::shared_ptr<T> waitAndPop()
    {
        std::unique_lock<std::mutex> lock(myMutex);
        myCondVar.wait(lock, [this]{return !myQueue.empty(); });
        std::shared_ptr<T> sharedPtrToT (std::make_shared<T>(myQueue.front()));
        myQueue.pop();
        return sharedPtrToT;
    }

    bool tryPop(Tamp; value)
    {
        std::lock_guard<std::mutex> lock(myMutex);
        if (myQueue.empty())
            return false;
        value = myQueue.front();
        myQueue.pop();
        return true;
    }

    std::shared_ptr<T> tryPop()
    {
        std::lock_guard<std::mutex> lock(myMutex);
        if (myQueue.empty())
            return std::shared_ptr<T>();
        std::shared_ptr<T> sharedPtrToT (std::make_shared<T>(myQueue.front()));
        myQueue.pop();
        return sharedPtrToT;
    }

    bool empty()
    {
        std::lock_guard<std::mutex> lock(myMutex);
        return myQueue.empty();
    }

private:
    mutable std::mutex      myMutex;
    std::queue<T>           myQueue;
    std::condition_variable myCondVar;
};

#endif // THREADSAFEQUEUETWO_H
  

Вот результат:

Я знаю, что в моем примере есть очевидные проблемы, но мой главный вопрос заключается в том, как я могу запустить что-то подобное в фоновом режиме, не блокируя основной поток?

Возможно, еще лучший способ попытаться решить эту проблему: есть ли способ, чтобы каждый раз, когда производитель «создавал» какие-то новые данные, я мог просто вызвать метод в основном потоке, передавая новые данные? Это было бы похоже на очередь сигналов / слотов в Qt, которой я большой поклонник.

Комментарии:

1. Ну, что вы хотите, чтобы «остальная часть приложения» выполняла во время запуска производителя и потребителя? Просто сделайте это в main , перед вами join .

2. @Igor Tandetnik — Ааааааа, НАЖМИТЕ — теперь я понял (я думаю). Итак, если бы я хотел, чтобы это выполнялось в течение всего времени работы приложения, я мог бы просто иметь некоторую логику, чтобы обнаружить, что приложение закрывается, а затем вызвать ‘join ()’ там… Правильно? Мои супер простые тесты с небольшим графическим интерфейсом, казалось, показали, что именно так это и работает!

3. @Igor Tandetnik — Забыл сказать спасибо! Я прошу прощения за вопрос новичка. Я был убежден, что вам нужно было вызвать join, чтобы запустить поток по какой-то причине. Вздох.