#c #multithreading
#c #многопоточность
Вопрос:
Вот что у меня, по сути, есть:
У меня есть поток A, который периодически проверяет наличие сообщений и обрабатывает их.
Потоки B и C должны отправлять сообщения в A.
Проблема возникает, когда B и C или B или C пытаются отправить сообщение в A, в то время как A обрабатывает сообщение и, таким образом, обращается к очереди.
Как обычно решается эта проблема?
Спасибо
Комментарии:
1. вы работаете с win32 или posix? Это помогло бы мне очистить мой маленький пример псевдокода.
Ответ №1:
Обычно это решается с помощью мьютексов или других многопоточных механизмов защиты.
Если вы работаете в Windows, MFC предоставляет класс CMutex для решения этой проблемы.
Если вы работаете в системе posix, API posix предоставляет функции pthread_mutex_lock
, pthread_mutex_unlock
, и pthread_mutex_trylock
.
Некоторый базовый псевдокод был бы удобен для демонстрации их использования в вашем случае:
pthread_mutex_t mutex; *or* CMutex mutex;
Q queue; // <-- both mutex and queue are global state, whether they are
// global variables, or passed in as parameters, they must
// be the shared by all threads.
int threadA(/* params */){
while( threadAStillRunning ){
// perform some non-critical actions ...
pthread_mutex_lock(mutex) *or* mutex.Lock()
// perform critical actions ...
msg = queue.receiveMessage()
pthread_mutex_unlock(mutex) *or* mutex.Unlock()
// perform more non-critical actions
}
}
int threadBorC(/* params */){
while( theadBorCStillRunning ){
// perform some non-critical actions ...
pthread_mutex_lock(mutex) *or* mutex.Lock()
// perform critical actions ...
queue.sendMessage(a_msg)
pthread_mutex_unlock(mutex) *or* mutex.Unlock()
}
}
Для всех трех потоков их способность работать с очередью зависит от их способности получать мьютекс — они просто заблокируют и будут ждать, пока мьютекс не будет получен. Это предотвращает конфликты, возникающие при использовании этого ресурса.
Комментарии:
1. Ответ Нейта правильный, но обратите внимание, что, хотя мьютексы просты в использовании, они могут не подходить для сценариев с очень высокой пропускной способностью, поскольку они имеют относительно высокие накладные расходы.
2. Ах, но так делают потоки в целом; ^) Серьезно, у разных платформ разные недостатки, когда дело доходит до защиты потоков; есть некоторые, чьи мьютексы зарегистрированы в ядре (я смотрю на тебя, windows), и другие, где мьютексы — это просто «обертки» вокруг атомарных проверок (что почти ничего не стоит).
3. @Nate: в Windows вы, вероятно, ищете критические разделы, которые также очень дешевы.
4. @BenVoigt — да, критические разделы имеют это преимущество (и они мне очень нравятся), но «идиоматически» они не должны использоваться для защиты данных, а просто использоваться внутри функций, которые не являются реентерабельными, которые вызываются из нескольких потоков. Полагаю, именно поэтому я их не упомянул. Однако, если вы не возражаете отказаться от этой идиомы, это недорогое решение.
5. @Nate: Я никогда не слышал этого философского аргумента относительно критических разделов. Они часто используются для синхронизации данных.
Ответ №2:
Если вы не используете Windows или если вы реализуете что-то кроссплатформенное на C , попробуйте использовать очередь из библиотек ACE.
ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue;
В качестве примера из примеров библиотеки ACE вы можете использовать
Для помещения сообщения в очередь:
ACE_NEW_RETURN (mb,
ACE_Message_Block (rb.size (),
ACE_Message_Block::MB_DATA,
0,
buffer),
0);
mb->msg_priority (ACE_Utils::truncate_cast<unsigned long> (rb.size ()));
mb->wr_ptr (rb.size ());
ACE_DEBUG ((LM_DEBUG,
"enqueueing message of size %dn",
mb->msg_priority ()));
// Enqueue in priority order.
if (msg_queue->enqueue_prio (mb) == -1)
ACE_ERROR ((LM_ERROR, "(%t) %pn", "put_next"));
для получения из очереди:
ACE_Message_Block *mb = 0;
msg_queue->dequeue_head (mb) == -1;
int length = ACE_Utils::truncate_cast<int> (mb->length ());
if (length > 0)
ACE_OS::puts (mb->rd_ptr ());
// Free up the buffer memory and the Message_Block.
ACE_Allocator::instance ()->free (mb->rd_ptr ());
mb->release ();
Преимущество в том, что вы можете очень легко изменить примитив синхронизации без необходимости писать слишком много кода.