#c #queue #consumer #producer
#c #очередь #потребитель #производитель
Вопрос:
У меня возникли проблемы с пониманием проблемы с несколькими производителями и одним потребителем. Я работаю над заданием и не уверен, как работает создание двух производителей.. Я понимаю, как работает проблема с одним производителем / потребителем, но я не могу понять, как работать с несколькими производителями, нужно ли мне создавать два отдельных потока для каждого производителя, если это так, как заполняет очередь своими «созданными данными», должен ли один из производителей находиться в спящем режиме, покадругой производитель заполняет, скажем, один элемент данных, а затем переключается туда и обратно, пока буфер очереди не заполнится?
Просто ищу объяснение, поскольку я не совсем понимаю, как это будет работать (прежде чем кто-то сделает предложение, я ищу кого-то, кто сделает мою домашнюю работу, это не так, просто ищу полезную информацию, чтобы прояснить мои мысли по этому поводу, чтобы я мог реализовать это сам)
Я рассмотрел много других вопросов / тем по этому поводу на этом и других других веб-сайтах и до сих пор не смог прийти к выводу о моем ответе.
Спасибо!
Комментарии:
1. Краткий пример.. Это как туалет на заправочной станции. Вы (потребитель) должны получить ключ от ванной (мьютекс / блокировку) у обслуживающего персонала (производителя), чтобы использовать ресурс (уборную). На заправочной станции может быть много обслуживающего персонала, которые проверяют клиентов, моют пол и т. Д., И все они занимаются своими делами, ожидая возврата ключа. На заправочной станции может быть много потребителей, которые перекачивают бензин, покупают еду и т. Д., И все они занимаются своими делами, ожидая освобождения туалета. Когда это так, другой потребитель может получить ключ от туалета.
2. Хорошо, это определенно полезно, так что, насколько я понимаю, я просто могу определить два потока производителей, и пусть они оба будут «принимать заказы», и какой из них доберется до него первым, доберется до него первым? Мне не нужно было бы, чтобы один из них мог «работать» одновременно, а просто соревноваться за способность выполнять задачу?
3. Единственный раз, когда потоки должны «конкурировать», — это когда они должны совместно использовать ресурс. Например, буфер. Потоки-производители вставляют информацию в буфер, потребители извлекают ее. Доступ к буферу должен быть защищен, чтобы 2 потока производителей не записывали свои данные в одно и то же место; в противном случае вы получите поврежденные данные. То же самое для потоков потребителей… без синхронизации потоков с общим ресурсом у вас никогда не будет четко определенного состояния для ресурса, и тогда наступает хаос. Когда потоки не обращаются к общему ресурсу, вы хотите, чтобы они выполняли
4. их собственная вещь параллельно, вот где вы получаете улучшения скорости от потоков. Вы хотите, чтобы критическая секция (часть, в которой потоки обращаются к общему ресурсу) была как можно меньше. Если у вас есть 100 потоков, но они тратят все свое время на попытки получить доступ к общему ресурсу, вам будет не лучше, чем если бы у вас был только 1 поток, и на самом деле вам, вероятно, будет хуже, потому что раскручивание потоков требует времени.
Ответ №1:
Вот мое решение, использующее только pipe
select
системные вызовы and для реализации MPSCQ. На следующей диаграмме показано, как это работает:
<producer-thread-1> {msg produced in heap}
/* only address of msg objects were sent to pipe[1] */
pipe[1] >>>(kernel)>>> pipe[0] <consumer-thread>:
/ 1. polling from pipe[0]
/ 2. restore msg object via address ptr
/ 3. process then delete the msg object
<producer-thread-2> {msg produced in heap}
Демонстрационный код написан на C для инкапсуляции очереди в класс, и функции C 14.11.17 не использовались. Первый — это класс очереди в форме шаблона:
// mpscq.hpp
#include <sys/select.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define PTR_SIZE (sizeof(void*))
template<class T> class MPSCQ { // Multi Producer Single Consumer Queue
public:
MPSCQ() {
int pipe_fd_set[2];
pipe(pipe_fd_set); // err-handler omitted for this demo
_fdProducer = pipe_fd_set[1];
_fdConsumer = pipe_fd_set[0];
}
~MPSCQ() { /* pipe close omitted for this demo */ }
int producerPush(const T* t) {
// will be blocked when pipe is full, should always return PTR_SIZE
return t == NULL ? 0 : write(_fdProducer, amp;t, PTR_SIZE);
}
T* consumerPoll(int timeout = 1);
private:
int _selectFdConsumer(int timeout);
private:
int _fdProducer; // pipe_fd_set[1]
int _fdConsumer; // pipe_fd_set[0]
};
template<class T> T* MPSCQ<T>::consumerPoll(int timeout) {
if (_selectFdConsumer(timeout) <= 0) { // timeout or error
return NULL;
}
char ptr_buff[PTR_SIZE];
ssize_t r = read(_fdConsumer, ptr_buff, PTR_SIZE);
if (r <= 0) {
fprintf(stderr, "consumer read EOF or error, r=%d, errno=%dn", r, errno);
return NULL;
}
T* t;
memcpy(amp;t, ptr_buff, PTR_SIZE); // cast received bytes to T*
return t;
}
template<class T> int MPSCQ<T>::_selectFdConsumer(int timeout) {
int nfds = _fdConsumer 1;
fd_set readfds;
struct timeval tv;
while (true) {
tv.tv_sec = timeout;
tv.tv_usec = 0;
FD_ZERO(amp;readfds);
FD_SET(_fdConsumer, amp;readfds);
int r = select(nfds, amp;readfds, NULL, NULL, amp;tv);
if (r < 0 amp;amp; errno == EINTR) {
continue;
}
return r;
}
}
Затем идет тестовый пример: 4 потока производителей, выдающих 1 ..100000, и один поток потребителей суммирует его.
// g -o mpscq mpscq.cpp -lpthread
#include "mpscq.hpp"
#include <sys/types.h>
#include <pthread.h>
#define PER_THREAD_LOOPS 25000
#define SAMPLE_INTERVAL 10000
#define PRODUCER_THREAD_NUM 4
struct TestMsg {
int _msgId; // a dummy demo member
int64_t _val; // _val < 0 is an end flag
TestMsg(int msg_id, int64_t val) :
_msgId(msg_id),
_val(val) { };
};
static MPSCQ<TestMsg> TEST_QUEUE;
void* functor_producer(void* arg) {
int* task_seg = (int*) arg;
TestMsg* msg;
for (int i = 0; i <= PER_THREAD_LOOPS; i) {
int64_t id = PER_THREAD_LOOPS * (*task_seg) i;
msg = new TestMsg(id, i >= PER_THREAD_LOOPS ? -1 : id 1);
TEST_QUEUE.producerPush(msg);
}
delete task_seg;
return NULL;
}
void* functor_consumer(void* arg) {
int64_t* sum = (int64_t*)arg;
int msg_cnt = 0;
int stop_cnt = 0; // for shutdown gracefully
TestMsg* msg;
while (true) {
if ((msg = TEST_QUEUE.consumerPoll()) == NULL) {
continue;
}
int64_t val = msg->_val;
delete msg; // this delete is essential to prevent memory leak
if (val <= 0) {
if (( stop_cnt) >= PRODUCER_THREAD_NUM) {
printf("all done, sum=%ldn", *sum);
break;
}
} else {
*sum = val;
if (( msg_cnt) % SAMPLE_INTERVAL == 0) {
printf("msg_cnt=%d, sum=%ldn", msg_cnt, *sum);
}
}
}
return NULL;
}
int main(int argc, char* const* argv) {
int64_t sum = 0;
printf("PTR_SIZE: %d, target: sum(1..%d)n", PTR_SIZE, PRODUCER_THREAD_NUM * PER_THREAD_LOOPS);
pthread_t consumer;
pthread_create(amp;consumer, NULL, functor_consumer, amp;sum);
pthread_t producers[PRODUCER_THREAD_NUM];
for (int i = 0; i < PRODUCER_THREAD_NUM; i) {
pthread_create(amp;producers[i], NULL, functor_producer, new int(i));
}
for (int i = 0; i < PRODUCER_THREAD_NUM; i) {
pthread_join(producers[i], NULL);
}
pthread_join(consumer, NULL);
return 0;
}
Образец результата теста:
$ ./mpscq
PTR_SIZE: 8, target: sum(1..100000)
msg_cnt=10000, sum=490096931
msg_cnt=20000, sum=888646187
msg_cnt=30000, sum=1282852073
msg_cnt=40000, sum=1606611602
msg_cnt=50000, sum=2088863858
msg_cnt=60000, sum=2573791058
msg_cnt=70000, sum=3180398370
msg_cnt=80000, sum=3768718659
msg_cnt=90000, sum=4336431164
msg_cnt=100000, sum=5000050000
all done, sum=5000050000
Реализованный здесь MPSCQ представляет собой шаблон передачи сообщений и позволяет ядру справляться со сложностью операций с внутренними очередями. Побочным эффектом трюка является то, что при большой рабочей нагрузке на стороне потребителя будет слишком много select
вызовов, что значительно повлияет на производительность. (В этой демонстрации каждый раз, когда потребитель просто извлекает 8 байт. Чтобы облегчить это, потребитель должен поддерживать дополнительный буфер приема.)