Безопасная очередь сообщений с несколькими потоками

#c #multithreading

#c #многопоточность

Вопрос:

Вот что у меня, по сути, есть:

У меня есть поток A, который периодически проверяет наличие сообщений и обрабатывает их.

Потоки B и C должны отправлять сообщения в A.

Проблема возникает, когда B и C или B или C пытаются отправить сообщение в A, в то время как A обрабатывает сообщение и, таким образом, обращается к очереди.

Как обычно решается эта проблема?

Спасибо

Комментарии:

1. вы работаете с win32 или posix? Это помогло бы мне очистить мой маленький пример псевдокода.

Ответ №1:

Обычно это решается с помощью мьютексов или других многопоточных механизмов защиты.

Если вы работаете в Windows, MFC предоставляет класс CMutex для решения этой проблемы.

Если вы работаете в системе posix, API posix предоставляет функции pthread_mutex_lock , pthread_mutex_unlock , и pthread_mutex_trylock .

Некоторый базовый псевдокод был бы удобен для демонстрации их использования в вашем случае:

 pthread_mutex_t mutex; *or* CMutex mutex;
Q queue;  // <-- both mutex and queue are global state, whether they are
          //     global variables, or passed in as parameters, they must
          //     be the shared by all threads.

int threadA(/* params */){
    while( threadAStillRunning ){
        // perform some non-critical actions ...
        pthread_mutex_lock(mutex) *or* mutex.Lock()
        // perform critical actions ...
        msg = queue.receiveMessage()
        pthread_mutex_unlock(mutex) *or* mutex.Unlock()
        // perform more non-critical actions
    }
}

int threadBorC(/* params */){
    while( theadBorCStillRunning ){
        // perform some non-critical actions ...
        pthread_mutex_lock(mutex) *or* mutex.Lock()
        // perform critical actions ...
        queue.sendMessage(a_msg)
        pthread_mutex_unlock(mutex) *or* mutex.Unlock()
    }
}
  

Для всех трех потоков их способность работать с очередью зависит от их способности получать мьютекс — они просто заблокируют и будут ждать, пока мьютекс не будет получен. Это предотвращает конфликты, возникающие при использовании этого ресурса.

Комментарии:

1. Ответ Нейта правильный, но обратите внимание, что, хотя мьютексы просты в использовании, они могут не подходить для сценариев с очень высокой пропускной способностью, поскольку они имеют относительно высокие накладные расходы.

2. Ах, но так делают потоки в целом; ^) Серьезно, у разных платформ разные недостатки, когда дело доходит до защиты потоков; есть некоторые, чьи мьютексы зарегистрированы в ядре (я смотрю на тебя, windows), и другие, где мьютексы — это просто «обертки» вокруг атомарных проверок (что почти ничего не стоит).

3. @Nate: в Windows вы, вероятно, ищете критические разделы, которые также очень дешевы.

4. @BenVoigt — да, критические разделы имеют это преимущество (и они мне очень нравятся), но «идиоматически» они не должны использоваться для защиты данных, а просто использоваться внутри функций, которые не являются реентерабельными, которые вызываются из нескольких потоков. Полагаю, именно поэтому я их не упомянул. Однако, если вы не возражаете отказаться от этой идиомы, это недорогое решение.

5. @Nate: Я никогда не слышал этого философского аргумента относительно критических разделов. Они часто используются для синхронизации данных.

Ответ №2:

Если вы не используете Windows или если вы реализуете что-то кроссплатформенное на C , попробуйте использовать очередь из библиотек ACE.

 ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue;
  

В качестве примера из примеров библиотеки ACE вы можете использовать
Для помещения сообщения в очередь:

   ACE_NEW_RETURN (mb,
              ACE_Message_Block (rb.size (),
              ACE_Message_Block::MB_DATA,
              0,
              buffer),
              0);
  mb->msg_priority (ACE_Utils::truncate_cast<unsigned long> (rb.size ()));
  mb->wr_ptr (rb.size ());

  ACE_DEBUG ((LM_DEBUG,
          "enqueueing message of size %dn",
          mb->msg_priority ()));

 // Enqueue in priority order.
 if (msg_queue->enqueue_prio (mb) == -1)
ACE_ERROR ((LM_ERROR, "(%t) %pn", "put_next"));
  

для получения из очереди:

  ACE_Message_Block *mb = 0;

 msg_queue->dequeue_head (mb) == -1;
 int length = ACE_Utils::truncate_cast<int> (mb->length ());

 if (length > 0)
   ACE_OS::puts (mb->rd_ptr ());

  // Free up the buffer memory and the Message_Block.
  ACE_Allocator::instance ()->free (mb->rd_ptr ());
  mb->release ();
  

Преимущество в том, что вы можете очень легко изменить примитив синхронизации без необходимости писать слишком много кода.