#multithreading #c 11
#многопоточность #c 11
Вопрос:
Я пытаюсь научить себя потоковой обработке C 11, и я хотел бы запустить фоновый поток производителя в начале приложения и запустить его до выхода из приложения. Я также хотел бы иметь поток-потребитель (который также выполняется в течение всего срока службы приложения).
Реальным примером может служить поток-производитель, прослушивающий Com-порт для входящих данных GPS. После накопления полного сообщения его можно проанализировать, чтобы определить, представляет ли оно интерес, затем преобразовать в строку (скажем) и «доставить обратно» для использования (например, обновить текущее местоположение).).
Моя проблема в том, что я не смог понять, как это сделать, не блокируя остальную часть приложения, когда я ‘join ()’ в потоке потребителя.
Вот мой очень упрощенный пример, который, надеюсь, показывает мои проблемы:
#include <QCoreApplication>
#include <QDebug>
#include <thread>
#include <atomic>
#include <iostream>
#include <queue>
#include <mutex>
#include <chrono>
#include "threadsafequeuetwo.h"
ThreadSafeQueueTwo<int> goods;
std::mutex mainMutex;
std::atomic<bool> isApplicationRunning = false;
void theProducer ()
{
std::atomic<int> itr = 0;
while(isApplicationRunning)
{
// Simulate this taking some time...
std::this_thread::sleep_for(std::chrono::milliseconds(60));
// Push the "produced" value onto the queue...
goods.push( itr);
// Diagnostic printout only...
if ((itr % 10) == 0)
{
std::unique_lock<std::mutex> lock(mainMutex);
std::cout << "PUSH " << itr << " on thread ID: "
<< std::this_thread::get_id() << std::endl;
}
// Thread ending logic.
if (itr > 100) isApplicationRunning = false;
}
}
void theConsumer ()
{
while(isApplicationRunning || !goods.empty())
{
int val;
// Wait on new values, and 'pop' when available...
goods.waitAndPop(val);
// Here, we would 'do something' with the new values...
// Simulate this taking some time...
std::this_thread::sleep_for(std::chrono::milliseconds(10));
// Diagnostic printout only...
if ((val % 10) == 0)
{
std::unique_lock<std::mutex> lock(mainMutex);
std::cout << "POP " << val << " on thread ID: "
<< std::this_thread::get_id() << std::endl;
}
}
}
int main(int argc, char *argv[])
{
std::cout << "MAIN running on thread ID: "
<< std::this_thread::get_id() << std::endl;
// This varaiable gets set to true at startup, and,
// would only get set to false when the application
// wants to exit.
isApplicationRunning = true;
std::thread producerThread (theProducer);
std::thread consumerThread (theConsumer);
producerThread.detach();
consumerThread.join(); // BLOCKS!!! - how to get around this???
std::cout << "MAIN ending on thread ID: "
<< std::this_thread::get_id() << std::endl;
}
Класс ThreadSafeQueueTwo — это потокобезопасная реализация очереди, взятая почти в точности из книги «Параллелизм C в действии». Кажется, это работает просто отлично. Вот это, если кому-то интересно:
#ifndef THREADSAFEQUEUETWO_H
#define THREADSAFEQUEUETWO_H
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
template<typename T>
class ThreadSafeQueueTwo
{
public:
ThreadSafeQueueTwo()
{}
ThreadSafeQueueTwo(ThreadSafeQueueTwo constamp; rhs)
{
std::lock_guard<std::mutex> lock(myMutex);
myQueue = rhs.myQueue;
}
void push(T newValue)
{
std::lock_guard<std::mutex> lock(myMutex);
myQueue.push(newValue);
myCondVar.notify_one();
}
void waitAndPop(Tamp; value)
{
std::unique_lock<std::mutex> lock(myMutex);
myCondVar.wait(lock, [this]{return !myQueue.empty(); });
value = myQueue.front();
myQueue.pop();
}
std::shared_ptr<T> waitAndPop()
{
std::unique_lock<std::mutex> lock(myMutex);
myCondVar.wait(lock, [this]{return !myQueue.empty(); });
std::shared_ptr<T> sharedPtrToT (std::make_shared<T>(myQueue.front()));
myQueue.pop();
return sharedPtrToT;
}
bool tryPop(Tamp; value)
{
std::lock_guard<std::mutex> lock(myMutex);
if (myQueue.empty())
return false;
value = myQueue.front();
myQueue.pop();
return true;
}
std::shared_ptr<T> tryPop()
{
std::lock_guard<std::mutex> lock(myMutex);
if (myQueue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> sharedPtrToT (std::make_shared<T>(myQueue.front()));
myQueue.pop();
return sharedPtrToT;
}
bool empty()
{
std::lock_guard<std::mutex> lock(myMutex);
return myQueue.empty();
}
private:
mutable std::mutex myMutex;
std::queue<T> myQueue;
std::condition_variable myCondVar;
};
#endif // THREADSAFEQUEUETWO_H
Я знаю, что в моем примере есть очевидные проблемы, но мой главный вопрос заключается в том, как я могу запустить что-то подобное в фоновом режиме, не блокируя основной поток?
Возможно, еще лучший способ попытаться решить эту проблему: есть ли способ, чтобы каждый раз, когда производитель «создавал» какие-то новые данные, я мог просто вызвать метод в основном потоке, передавая новые данные? Это было бы похоже на очередь сигналов / слотов в Qt, которой я большой поклонник.
Комментарии:
1. Ну, что вы хотите, чтобы «остальная часть приложения» выполняла во время запуска производителя и потребителя? Просто сделайте это в
main
, перед вамиjoin
.2. @Igor Tandetnik — Ааааааа, НАЖМИТЕ — теперь я понял (я думаю). Итак, если бы я хотел, чтобы это выполнялось в течение всего времени работы приложения, я мог бы просто иметь некоторую логику, чтобы обнаружить, что приложение закрывается, а затем вызвать ‘join ()’ там… Правильно? Мои супер простые тесты с небольшим графическим интерфейсом, казалось, показали, что именно так это и работает!
3. @Igor Tandetnik — Забыл сказать спасибо! Я прошу прощения за вопрос новичка. Я был убежден, что вам нужно было вызвать join, чтобы запустить поток по какой-то причине. Вздох.