Модель потоков C для создания нескольких экземпляров потоков, порождающих классы

#c #multithreading #architecture #pthreads

Вопрос:

Абстрактный:

Я разрабатываю класс ( Inner ), который порождает 2 потока — производителя и потребителя. В одном использовании есть один экземпляр, а в другом контексте-несколько экземпляров.

В автономном режиме мне нужны два потока, чтобы продолжать писать/читать сообщения. Однако, если существует несколько экземпляров, мне нужен код для создания двух потоков, но продолжайте создавать следующий класс (чтобы создать больше потоков).

Моя проблема заключается в попытке объединить эти два сценария с правильным использованием std::thread::join() и std::thread::detach() .

Детали кода:

Класс Inner порождает поток для приема и очереди сообщений и второй поток для чтения очереди и отправки сообщений владельцу класса Inner .

 template<Owner>
class Inner
{
    Inner(Owneramp; owner) : _owner(owner)
    {
        // Spawn thread to receive packets and put on queue
        // Spawn thread to read from queue
    }

    void receiveMessage(const Messageamp; msg)
    {
        // Ommitted locks etc for simplicty
        _queue.push(msg);
    }

    void readFromQueue()
    {
        // Ommitted loop, locks etc for simplicty
        _owner.receiveMessage(msg);
    }

    Owneramp; _owner;
    std::queue<Message> _queue;
};
 

Существует два возможных класса владельцев, SingleInner :

 class OneInner
{
    OneInner()
    {
        _inner = std::make_unique<Inner>();
    }

    void receiveMessage(const Messageamp; msg){//Code ommitted}

    std::unique_ptr<Inner> _inner;
};
 

и второй контекст имеет несколько экземпляров:

 class MultipleInners
{
    MultipleInners()
    {
        // Need to create multiple instances of Inner, each with Inner's two threads running
    }

    void receiveMessage(const Messageamp; msg){//Code ommitted}

    std::vector<std::unique_ptr<Inner>> _inners;
};
 

Я не уверен, как я могу разрешить Inner создавать 2 потока, чтобы они продолжали работать, но в OneInner коде ждет, а в MultipleInners коде продолжает создавать следующий Inner .

Или если есть совершенно лучший способ достичь этого?

Ответ №1:

Ресурс, который вам нужен для каждого экземпляра класса, и время жизни которого совпадает с временем жизни соответствующего экземпляра, лучше всего представлять в виде переменной-члена.

Поэтому просто сделайте переменные-члены потоков Inner , которые запускаются при создании и объединяются при уничтожении:

 template<Owner>
class Inner
{
    Inner(Owneramp; owner) 
      : _owner(owner)
      , _recv_thread([this](){readFromQueue();}),
      , _read_thread([this](){receiveLoop();}),
    {
    }

    ~Inner() {
      _recv_thread.join();
      _read_thread.join();
    }

    void receiveLoop() {
      while(...) {
        //etc...
        receiveMessage(msg);
      }
    }

    void receiveMessage(const Messageamp; msg)
    {
        // Ommitted locks etc for simplicty
        _queue.push(msg);
    }

    void readFromQueue()
    {
        // Ommitted loop, locks etc for simplicty
        _owner.receiveMessage(msg);
    }


    Owneramp; _owner;
    std::queue<Message> _queue;
    
    // Make sure these are the last members, so that _owner and _queue 
    // are constructed already when the threads start
    std::thread _recv_thread;
    std::thread _read_thread;

};
 

Комментарии:

1. Это работает. К сожалению, я должен передать аргумент ReceiveMessage (), который является Google Protobuf, и я думаю, что они сделали его недоступным для копирования и т. Д., Поэтому у меня есть несколько дополнительных головных болей, которые нужно вылечить.

2. @user997112 Да, извините, я думаю receiveMessage() , что это не то, к чему нужно переходить _read_thread , вы, вероятно, хотите, чтобы какая-либо функция имела основной цикл для этого потока. Я исправлю ответ,

3. Выходит за рамки вопроса, но: Вы даже уверены, что вам нужен выделенный принимающий поток для каждого Inner ? Это звучит немного необычно. Какой бы поток ни создавал сообщения, он может просто вызывать receiveMessage(msg) непосредственно себя.

4. Также: Протобуфы не поддаются копированию, но они должны быть перемещаемы без особых проблем.

5. Я поместил цикл приема в другой класс. Итак, мой лямбда-инициализатор списка выглядит следующим образом: ): _receiveThread([this](const GoogleProtobufamp; gpb){_udp.listen(getPortsAddresses(std::move(gpb)))}) Тем не менее, я получаю static assertion failed. std::thread arguments must be invocable after conversion to rvalues