Многопоточная программная архитектура C pthread

#c #linux #multithreading

#c #linux #многопоточность

Вопрос:

Я хотел бы создать многопоточную программу на C (Linux) с:

  1. Бесконечный цикл с бесконечным числом задач
  2. Один поток на одну задачу
  3. Ограничьте общее количество потоков, поэтому, если, например, общее количество потоков больше MAX_THREADS_NUMBER, выполните sleep(), пока общее количество потоков не станет меньше MAX_THREADS_NUMBER, продолжайте после.

Резюме: мне нужно выполнить бесконечное количество задач (по одной задаче на один поток), и я хотел бы знать, как это реализовать, используя pthreads в C.

Вот мой код:

 #include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>


#define MAX_THREADS 50


pthread_t thread[MAX_THREADS];
int counter;
pthread_mutex_t lock;

void* doSomeThing(void *arg)
{
    pthread_mutex_lock(amp;lock);
    counter  = 1;
    printf("Job %d startedn", counter);
    pthread_mutex_unlock(amp;lock);

    return NULL;
}

int main(void)
{
    int i = 0;
    int ret;

    if (pthread_mutex_init(amp;lock, NULL) != 0)
    {
        printf("n mutex init failedn");
        return 1;
    }

    for (i = 0; i < MAX_THREADS; i  ) {
        ret = pthread_create(amp;(thread[i]), NULL, amp;doSomeThing, NULL);
        if (ret != 0)
            printf("ncan't create thread :[%s]", strerror(ret));
    }

    // Wait all threads to finish
    for (i = 0; i < MAX_THREADS; i  ) {
        pthread_join(thread[i], NULL);
    }

    pthread_mutex_destroy(amp;lock);

    return 0;
}
  

Как сделать этот цикл бесконечным?

 for (i = 0; i < MAX_THREADS; i  ) {
    ret = pthread_create(amp;(thread[i]), NULL, amp;doSomeThing, NULL);
    if (ret != 0)
        printf("ncan't create thread :[%s]", strerror(ret));
}
  

Мне нужно что-то вроде этого:

 while (1) {
    if (thread_number > MAX_THREADS_NUMBER)
        sleep(1);

    ret = pthread_create(...);
    if (ret != 0)
        printf("ncan't create thread :[%s]", strerror(ret));
}
  

Комментарии:

1. Используйте пул потоков .

2. То, что вы хотите, недостаточно ясно. Не могли бы вы уточнить?

3. @shrike Мне нужно выполнить бесконечное количество задач (по одной задаче на один поток), и я хотел бы знать, как это реализовать, используя pthreads в C. Это то, что вы хотели знать?

Ответ №1:

Ваша текущая программа основана на простой схеме отправки: начальный поток создает рабочие потоки, назначая каждому из них задачу для выполнения. Ваш вопрос в том, как вы заставляете это работать для любого количества задач, любого количества рабочих потоков. Ответ в том, что вы этого не делаете: выбранный вами дизайн делает такую модификацию практически невозможной.

Даже если бы я ответил на ваши заданные вопросы, это не заставило бы программу вести себя так, как вам хотелось бы. Это могло бы работать в некотором роде, но это было бы похоже на велосипед с квадратными колесами: не очень практично и надежно — даже не весело, когда вы перестаете смеяться над тем, как глупо это выглядит.

Решение, как я писал в комментарии к исходному вопросу, заключается в изменении базового дизайна: от простой отправки до подхода пула потоков.

Реализация пула потоков требует двух вещей: во-первых, изменить свою точку зрения с запуска потока и выполнения им задачи, чтобы каждый поток в «пуле» захватывал задачу для выполнения и возвращался в «пул» после ее выполнения. Понимание этого является сложной частью. Вторая часть, реализующая способ для каждого потока захватить новую задачу, проста: обычно это сосредоточено вокруг структуры данных, защищенной какими-либо блокировками. Однако точная структура данных зависит от того, какую фактическую работу нужно выполнить.

Давайте предположим, что вы хотели распараллелить вычисление множества Мандельброта (или, скорее, время выхода или количество итераций, необходимых для того, чтобы можно было исключить точку из множества; страница Википедии содержит псевдокод именно для этого). Это одна из «смущающе параллельных» проблем; те, где подзадачи (здесь, каждая точка) могут быть решены без каких-либо зависимостей.

Вот как я бы сделал ядро пула потоков в этом случае. Сначала для каждой точки необходимо записать время выхода или количество итераций. Допустим, мы используем unsigned int для этого. Нам также нужно количество точек (это 2D-массив), способ вычисления комплексного числа, соответствующего каждой точке, плюс какой-то способ узнать, какие точки либо были вычислены, либо вычисляются. Плюс взаимоисключающая блокировка, так что только один поток будет изменять структуру данных одновременно. Итак:

 typedef struct {
    int               x_size, y_size;
    size_t            stride;
    double            r_0,  i_0;
    double            r_dx, i_dx;
    double            r_dy, i_dy;
    unsigned int     *iterations;
    sem_t             done;
    pthread_mutex_t   mutex;
    int               x, y;
} fractal_work;
  

Когда создается экземпляр fractal_work , x_size и y_size количество столбцов и строк в iterations карте. Количество итераций (или время выхода) для точки x y хранится в iterations[x y*stride] . Действительная часть комплексной координаты для этой точки равна r_0 x*r_dx y*r_dy , а мнимая часть равна i_0 x*i_dx y*i_dy (что позволяет вам свободно масштабировать и поворачивать фрактал).

Когда поток захватывает следующую доступную точку, он сначала блокирует mutex и копирует значения x and y (для себя для работы). Затем оно увеличивается x . Если x >= x_size он сбрасывается x до нуля и увеличивается y . Наконец, он разблокирует mutex и вычисляет время выхода для этой точки.

Однако, если x == 0 amp;amp; y >= y_size поток отправляет сообщение в done семафор и завершает работу, давая начальному потоку знать, что фрактал завершен. (Начальный поток просто должен вызываться sem_wait() один раз для каждого созданного им потока.)

Тогда функция thread worker выглядит примерно следующим образом:

 void *fractal_worker(void *data)
{
    fractal_work *const work = (fractal_work *)data;
    int           x, y;

    while (1) {

        pthread_mutex_lock(amp;(work->mutex));

        /* No more work to do? */
        if (work->x == 0 amp;amp; work->y >= work->y_size) {
            sem_post(amp;(work->done));
            pthread_mutex_unlock(amp;(work->mutex));
            return NULL;
        }

        /* Grab this task (point), advance to next. */
        x = work->x;
        y = work->y;
        if (  (work->x) >= work->x_size) {
            work->x = 0;
              (work->y);
        }

        pthread_mutex_unlock(amp;(work->mutex));

        /* z.r = work->r_0   (double)x * work->r_dx   (double)y * work->r_dy;
           z.i = work->i_0   (double)x * work->i_dx   (double)y * work->i_dy;

           TODO: implement the fractal iteration,
                 and count the iterations (say, n)

                 save the escape time (number of iterations)
                 in the work->iterations array; e.g.
            work->iterations[(size_t)x   work->stride*(size_t)y] = n;
        */
    }
}
  

Программа сначала создает структуру fractal_work данных для рабочих потоков для работы, инициализирует ее, затем создает некоторое количество потоков, присваивая каждому потоку адрес этой fractal_work структуры. Затем он также может вызвать fractal_worker() себя, чтобы «присоединиться к пулу потоков». (Этот пул автоматически «сливается», т. Е. Потоки возвращаются / выходят, когда все точки во фрактале выполнены.)

Наконец, основной поток вызывает sem_wait() done семафор столько раз, сколько он создал рабочих потоков, чтобы убедиться, что вся работа выполнена.

Точные поля в fractal_work приведенной выше структуре не имеют значения. Тем не менее, он находится в самом ядре пула потоков. Обычно существует по крайней мере один мьютекс или rwlock, защищающий рабочие данные, так что каждый рабочий поток получает уникальные рабочие данные, а также какой-то флаг или переменную условия или семафор, чтобы сообщить исходному потоку, что задача теперь завершена.

На многопоточном сервере обычно имеется только один экземпляр структуры (или переменных), описывающих рабочую очередь. Он может даже содержать такие вещи, как минимальное и максимальное количество потоков, позволяя рабочим потокам управлять своим собственным числом, чтобы динамически реагировать на объем доступной работы. Это звучит волшебно, но на самом деле просто реализовать: когда поток завершил свою работу или проснулся в пуле без работы и удерживает мьютекс, он сначала проверяет, сколько существует заданий в очереди и каково текущее количество рабочих потоков. Если количество потоков превышает минимальное, а работы нет, поток уменьшает количество потоков и завершает работу. Если количество потоков меньше максимального и предстоит много работы, поток сначала создает новый поток, а затем берет следующую задачу для работы. (Да, любой поток может создавать новые потоки в процессе. Все они также находятся в равных условиях.)

Большая часть кода в практическом многопоточном приложении, использующем один или несколько пулов потоков для выполнения работы, является своего рода бухгалтерией. Подходы к пулу потоков в значительной степени концентрируются на данных и вычислениях, которые необходимо выполнить с данными. Я уверен, что где-то есть гораздо лучшие примеры пулов потоков; самое сложное — придумать хорошую задачу для выполнения приложения, поскольку структуры данных настолько зависят от задач, а многие вычисления настолько просты, что их распараллеливание не имеет смысла (поскольку создание новых потоков требуетнебольшие вычислительные затраты, было бы глупо тратить время на создание потоков, когда один поток выполняет ту же работу за то же или меньшее время).

С другой стороны, многие задачи, которые выигрывают от распараллеливания, требуют обмена информацией между рабочими, и для правильной реализации этого требуется много размышлений. (Например, хотя существуют решения для эффективного распараллеливания моделирования молекулярной динамики, большинство симуляторов по-прежнему вычисляют и обмениваются данными на отдельных этапах, а не одновременно. Видите ли, это так сложно сделать правильно.)

Все это означает, что вы не можете ожидать, что сможете написать код, если не понимаете концепцию. Действительно, по-настоящему понять концепции — это сложная часть: написать код сравнительно легко.

Даже в приведенном выше примере есть определенные точки отключения: имеет ли значение порядок размещения семафора и освобождения мьютекса? (Ну, это зависит от того, что делает поток, ожидающий завершения фрактала, и действительно, ждет ли он еще.) Если бы это была переменная условия вместо семафора, было бы важно, чтобы поток, который заинтересован в завершении фрактала, ожидал от переменной условия, иначе он пропустил бы сигнал / трансляцию. (Именно поэтому я использовал семафор.)

Комментарии:

1. Большое спасибо за ваш ответ. Не могли бы вы опубликовать полный пример, пожалуйста.

2. @SebastianRockefeller: Зачем вам нужен полный пример? Как я уже сказал, общая концепция пула потоков — это сложная часть, и это самое главное. Найдите это в своих справочных материалах, ссылках на странице Википедии или в одном из поисковых запросов в Интернете при поиске "c thread pool" .