#c #linux #sockets #pthreads #mutex
#c #linux #сокеты #pthreads #мьютекс
Вопрос:
Я внедряю простой механизм пула потоков для моего сервера Ubuntu (для моей программы анонимного чата с несколькими клиентами), и мне нужно перевести мои рабочие потоки в режим ожидания до тех пор, пока не потребуется выполнить задание (в виде указателя на функцию и параметра).
Моя текущая система выходит из строя. Я (рабочий поток) спрашиваю менеджера, доступно ли задание, и нет ли режима ожидания в течение 5 мс. Если есть, добавьте задание в рабочую очередь и запустите с помощью функции. Жалкая трата циклов.
Что я хотел бы сделать, так это создать простую событийную систему. Я подумываю о том, чтобы создать вектор мьютексов (по одному для каждого рабочего) и передать дескриптор мьютекса в качестве параметра при создании. Затем в моем классе manager (который хранит и раздает задания) всякий раз, когда создается поток, блокируйте мьютекс. Когда необходимо выполнить задание, разблокируйте следующий мьютекс в строке, дождитесь, пока он будет заблокирован и разблокирован, и повторно заблокируйте его. Однако мне интересно, есть ли гораздо лучшее средство для достижения этой цели.
tldr; Итак, мой вопрос заключается в следующем. Каков наиболее эффективный и безопасный способ заставить поток ожидать задания от управляющего класса? Является ли опрос методом, который я должен даже рассмотреть (более 1000 клиентов одновременно), является ли блокировка мьютекса приемлемой? Или есть другие методы?
Ответ №1:
Что вам нужно, так это переменная условия.
Все рабочие потоки вызывают wait(), который приостанавливает их.
Затем родительский поток помещает рабочий элемент в очередь и вызывает signal для переменной условия. Это разбудит один поток, который находится в спящем режиме. Он может удалить задание из очереди, выполнить задание, а затем вызвать wait для переменной условия, чтобы вернуться в спящий режим.
Попробуйте:
#include <pthread.h>
#include <memory>
#include <list>
// Use RAII to do the lock/unlock
struct MutexLock
{
MutexLock(pthread_mutex_tamp; m) : mutex(m) { pthread_mutex_lock(amp;mutex); }
~MutexLock() { pthread_mutex_unlock(amp;mutex); }
private:
pthread_mutex_tamp; mutex;
};
// The base class of all work we want to do.
struct Job
{
virtual void doWork() = 0;
};
// pthreads is a C library the call back must be a C function.
extern "C" void* threadPoolThreadStart(void*);
// The very basre minimal part of a thread pool
// It does not create the workers. You need to create the work threads
// then make them call workerStart(). I leave that as an exercise for you.
class ThreadPool
{
public:
ThreadPool(unsigned int threadCount=1);
~ThreadPool();
void addWork(std::auto_ptr<Job> job);
private:
friend void* threadPoolThreadStart(void*);
void workerStart();
std::auto_ptr<Job> getJob();
bool finished; // Threads will re-wait while this is true.
pthread_mutex_t mutex; // A lock so that we can sequence accesses.
pthread_cond_t cond; // The condition variable that is used to hold worker threads.
std::list<Job*> workQueue; // A queue of jobs.
std::vector<pthread_t>threads;
};
// Create the thread pool
ThreadPool::ThreadPool(int unsigned threadCount)
: finished(false)
, threads(threadCount)
{
// If we fail creating either pthread object than throw a fit.
if (pthread_mutex_init(amp;mutex, NULL) != 0)
{ throw int(1);
}
if (pthread_cond_init(amp;cond, NULL) != 0)
{
pthread_mutex_destroy(amp;mutex);
throw int(2);
}
for(unsigned int loop=0; loop < threadCount; loop)
{
if (pthread_create(threads[loop], NULL, threadPoolThreadStart, this) != 0)
{
// One thread failed: clean up
for(unsigned int kill = loop -1; kill < loop /*unsigned will wrap*/;--kill)
{
pthread_kill(threads[kill], 9);
}
throw int(3);
}
}
}
// Cleanup any left overs.
// Note. This does not deal with worker threads.
// You need to add a method to flush all worker threads
// out of this pobject before you let the destructor destroy it.
ThreadPool::~ThreadPool()
{
finished = true;
for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); loop)
{
// Send enough signals to free all threads.
pthread_cond_signal(amp;cond);
}
for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); loop)
{
// Wait for all threads to exit (they will as finished is true and
// we sent enough signals to make sure
// they are running).
void* resu<
pthread_join(*loop, amp;result);
}
// Destroy the pthread objects.
pthread_cond_destroy(amp;cond);
pthread_mutex_destroy(amp;mutex);
// Delete all re-maining jobs.
// Notice how we took ownership of the jobs.
for(std::list<Job*>::const_iterator loop = workQueue.begin(); loop != workQueue.end(); loop)
{
delete *loop;
}
}
// Add a new job to the queue
// Signal the condition variable. This will flush a waiting worker
// otherwise the job will wait for a worker to finish processing its current job.
void ThreadPool::addWork(std::auto_ptr<Job> job)
{
MutexLock lock(mutex);
workQueue.push_back(job.release());
pthread_cond_signal(amp;cond);
}
// Start a thread.
// Make sure no exceptions escape as that is bad.
void* threadPoolThreadStart(void* data)
{
ThreadPool* pool = reinterpret_cast<ThreadPool*>(workerStart);
try
{
pool->workerStart();
}
catch(...){}
return NULL;
}
// This is the main worker loop.
void ThreadPool::workerStart()
{
while(!finished)
{
std::auto_ptr<Job> job = getJob();
if (job.get() != NULL)
{
job->doWork();
}
}
}
// The workers come here to get a job.
// If there are non in the queue they are suspended waiting on cond
// until a new job is added above.
std::auto_ptr<Job> ThreadPool::getJob()
{
MutexLock lock(mutex);
while((workQueue.empty()) amp;amp; (!finished))
{
pthread_cond_wait(amp;cond, amp;mutex);
// The wait releases the mutex lock and suspends the thread (until a signal).
// When a thread wakes up it is help until it can acquire the mutex so when we
// get here the mutex is again locked.
//
// Note: You must use while() here. This is because of the situation.
// Two workers: Worker A processing job A.
// Worker B suspended on condition variable.
// Parent adds a new job and calls signal.
// This wakes up thread B. But it is possible for Worker A to finish its
// work and lock the mutex before the Worker B is released from the above call.
//
// If that happens then Worker A will see that the queue is not empty
// and grab the work item in the queue and start processing. Worker B will
// then lock the mutext and proceed here. If the above is not a while then
// it would try and remove an item from an empty queue. With a while it sees
// that the queue is empty and re-suspends on the condition variable above.
}
std::auto_ptr<Job> resu<
if (!finished)
{ result.reset(workQueue.front());
workQueue.pop_front();
}
return resu<
}
Комментарии:
1. Переменная условия является мьютексом / семафором?
2. @ultifinitus: Нет. Переменная условия является примитивом самого низкого уровня (наряду с мьютексом) в потоковом коде. Вы создаете семафоры из переменных мьютекса / условия.
3. Хаха, спасибо! Я ценю помощь! Мне понравилась
I leave that as an exercise for you
часть. Я дам вам знать, как в конечном итоге работает конечный продукт.
Ответ №2:
Обычный способ реализации этого — иметь очередь queue
невыполненной работы, мьютекс, mutex
защищающий очередь, и условие ожидания queue_not_empty
. Затем каждый рабочий поток выполняет следующее (используя псевдо-api):
while (true) {
Work * work = 0;
mutex.lock();
while ( queue.empty() )
if ( !queue_not_empty.wait( amp;mutex, timeout ) )
return; // timeout - exit the worker thread
work = queue.front();
queue.pop_front();
mutex.unlock();
work->perform();
}
wait( amp;mutex, timeout )
Вызов блокируется до тех пор, пока не будет сигнализировано условие ожидания или время ожидания вызова не истечет. mutex
Переданное элементарно разблокируется внутри wait()
и снова блокируется перед возвратом из вызова, чтобы обеспечить согласованное представление очереди для всех участников. timeout
было бы выбрано довольно большое значение (секунды) и привело бы к завершению потока (пул потоков запустил бы новые, если бы было больше работы).
Между тем, рабочая функция вставки пула потоков выполняет это:
Work * work = ...;
mutex.lock();
queue.push_back( work );
if ( worker.empty() )
start_a_new_worker();
queue_not_empty.wake_one();
mutex.unlock();
Комментарии:
1. Это как раз то, о чем я думал, спасибо за тайм-аут, я думал, мне нужно будет реализовать какое-то задание die.. Я ценю это!
Ответ №3:
Классическая синхронизация производителя и потребителя с несколькими потребителями (рабочие потоки обрабатывают рабочие запросы). Хорошо известный метод заключается в том, чтобы иметь семафор, который выполняет каждый рабочий поток down()
, и каждый раз, когда у вас есть рабочий запрос, выполняйте up()
. Затем выбирает запрос из рабочей очереди с блокировкой мьютекса. Поскольку один up()
вызовет только один down()
, на самом деле конфликт в мьютексе будет минимальным.
В качестве альтернативы вы можете сделать то же самое с условной переменной, выполняя ожидание в каждом потоке и пробуждая один, когда у вас есть задание. Сама очередь по-прежнему заблокирована мьютексом (в любом случае для condvar требуется один).
В последнем я не совсем уверен, но я действительно думаю, что вы действительно можете использовать канал в качестве очереди, включая всю синхронизацию (рабочие потоки просто пытаются «прочитать (sizeof (запрос))»). Немного халтурно, но приводит к меньшему количеству переключений контекста.
Комментарии:
1. Семафор — это простое решение. Обычно это реализуется как мьютекс с переменной условия и целочисленным числом. Но, кроме того, вы должны убедиться, что вы контролируете доступ потока к любому другому общему ресурсу (например, к списку ожидающих выполнения запросов).
2. @Martin: Я описал оба способа и явно сказал, что очередь должна быть заблокирована. Из-за эквивалентности примитивов синхронизации выбор зависит от того, какой из них более эффективен в данной среде. Фактически это просто реализует очередь сообщений, поэтому, когда она доступна, просто используйте ее (три эквивалентных примитива синхронизации — семафор, условная переменная и очередь сообщений).
Ответ №4:
Поскольку программа сетевого чата предположительно связана с вводом-выводом, а не с процессором, вам на самом деле не нужны потоки. Вы можете обрабатывать все ваши операции ввода-вывода в одном потоке, используя такое средство, как Boost.Asio или основной цикл GLib. Это переносимые абстракции поверх функций, зависящих от платформы, которые позволяют программе блокировать ожидание активности для любого (потенциально большого) набора открытых файлов или сокетов, а затем просыпаться и быстро реагировать при возникновении активности.
Комментарии:
1. Я делал все в одном потоке select () и poll (), я просто беспокоился о скорости… В этом вообще нет необходимости?
2. @ultifinitus, если ваша программа не выполняет много работы с процессором (например, криптографирование для SSL на каждом подключенном сокете), она, вероятно, проводит большую часть своего времени в режиме ожидания в ожидании ввода от клиентов и использует процессор только в течение коротких интервалов, когда поступает ввод. Нет никакой пользы в распределении такой рабочей нагрузки между несколькими ядрами.
3. @Wyzard: Это фантастика! Должен ли я иметь систему резервного копирования (многопоточную), как вы думаете? Если то, что вы говорите, правда, я закончил !!!
4. @ultifinitus, если у вас это уже работает с использованием
select()
или подобного, я бы посоветовал провести стресс-тест с кучей клиентов и взглянуть на фактическую загрузку процессора. Если существуют значительные промежутки времени, когда процессор является узким местом, тогда рассмотрите возможность перезаписи его для использования потоков. Если нет, оставьте все как есть и избегайте всей сложности кода, связанной с потоками и синхронизацией.5. Обнаружил, что без пулов потоков я могу обрабатывать около 1600 клиентов (хотя и активных) одновременно. Я дам вам знать о пулах потоков, когда реализация будет завершена.
Ответ №5:
Самый простой способ сделать это semaphores
. Вот как работает семафор:
Семафор — это, по сути, переменная, которая принимает нулевые / положительные значения. Процессы могут взаимодействовать с ним двумя способами: увеличивать или уменьшать семафор.
Увеличение семафора добавляет 1 к этой волшебной переменной, и это все. Самое интересное заключается в уменьшении количества,если количество достигает нуля и процесс пытается снова его уменьшить, поскольку он не может принимать отрицательные значения, он будет блокироваться до тех пор, пока переменная не увеличится.
Если несколько блоков процессов ожидают уменьшения значения семафора, только один из них активируется для каждой единицы, количество которых увеличивается.
Это очень упрощает создание системы рабочих / задач: ваш процесс-менеджер ставит задачи в очередь и увеличивает значение семафора, чтобы соответствовать оставшимся элементам, а ваши рабочие процессы пытаются уменьшить количество и постоянно получают задачу. Когда никакие задачи не будут доступны, они будут блокироваться и не потреблять процессорного времени. Когда он появится, только один из бездействующих процессов будет активирован. Волшебство мгновенной синхронизации.
К сожалению, по крайней мере в мире Unix, semaphore API не очень дружелюбен, поскольку по какой-то причине он имеет дело с массивами семафоров, а не с отдельными. Но вы всего лишь оболочка, лишенная приятного интерфейса!
Приветствия!
Комментарии:
1. Это почти идеально! Я обязательно проведу тщательное исследование, спасибо за ответ!