Реализуйте барьер с потоками на C

#c #parallel-processing #pthreads #pthread-barriers

Вопрос:

Я пытаюсь распараллелить алгоритм сортировки слиянием. То, что я делаю, — это разделяю входной массив для каждого потока, а затем объединяю результаты потоков. То, как я пытаюсь объединить результаты, выглядит примерно так:

 thread 0                     |   thread 1        |   thread 2         |   thread 3

sort(A0)                     |   sort(A1)        |   sort(A2)         | sort(A3)
merge(A0,A1)                 |                   |   merge(A2,A3)     | 
merge(A0A1, A2A3)            |                   |                    |
 

Итак, в конце моей функции sortManager я вызываю функцию mergeThreadResults , которая должна реализовать вышеуказанную логику. В нем я перебираю пары, чтобы объединить соответствующие потоки. Затем, при необходимости, я объединяю последние элементы в поток 0. Это выглядит так :

 void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {

    int nextThread;
    int iter = 2;
    while (iter <= threads) {
        int nextThread = (myRank 1*iter) < threads ? (myRank 1*iter) : threads;
        int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;

        printf("Merging threads %ld to %dn", myRank, nextThread);
        
        if (myRank % iter != 0) {
            break;
        }

        merge(sortingArray, myLeft, myRight, nextThreadRight);
        sleep(3); // <- sleep

        myRight = nextThreadRight;
        iter = iter * 2;
    }

     if (myRank == 0 amp;amp; nextThread < threads-1) {
        int nextThreadRight = threads * ((float)size / (float)threads) - 1;
        merge(sortingArray, myLeft, myRight, nextThreadRight);
     }

}
 

Похоже, все работает так, как задумано. Проблема в том, что я использую sleep функцию для синхронизации потоков, что далеко не лучший подход. Поэтому я пытаюсь реализовать барьер с помощью pthread.
В нем я пытаюсь рассчитать, сколько итераций потребуется для этого цикла, и передать его как breakpoint . Когда все потоки находятся в одной точке, я отпускаю функцию слияния и снова жду в новом цикле. Это то, что я пробовал:

         pthread_mutex_lock(amp;mutex);
        counter  ;
        int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter 1;
        if(counter >= breakpoint ) {
            counter = 0;
            pthread_cond_broadcast(amp;cond_var);
        } else {
            while (pthread_cond_wait(amp;cond_var, amp;mutex) != 0);
        }
        pthread_mutex_unlock(amp;mutex);
 

Но это работает не так, как задумывалось. Некоторые merge триггеры до того, как последний цикл полностью закончился, оставив меня с частично отсортированным массивом.

Это незначительный пример моего кода для тестирования:

 #define _GNU_SOURCE

#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <string.h>
#include <time.h>

#include <pthread.h>
#include <unistd.h>

// Initialize global variables
int sortingArray[20] = {5,-4,3,-1,-2,3,1,2,-2,-1,-2,-1,-2,-3,4,1234,534,123,87,123};
int counter = 0;
pthread_mutex_t mutex;
pthread_cond_t cond_var;

struct ThreadTask {
    long rank;
    int size;
    int threads;
};

void merge(int arr[], int left, int mid, int right) {
    /* Merge arrays */

    int i, j, k;
    int n1 = mid - left   1;
    int n2 = right - mid;

    // Alocate temp arrays
    int *L = malloc((n1   2) * sizeof(int));
    int *R = malloc((n2   2) * sizeof(int));
    if (L == NULL || R == NULL) {
        fprintf(stderr, "Fatal: failed to allocate memory fo temp arrays.");
        exit(EXIT_FAILURE);
    }

    // Populate temp arrays
    for (i = 1; i <= n1; i  ) {
        L[i] = arr[left   i - 1];
    }
    for (j = 1; j <= n2; j  ) {
        R[j] = arr[mid   j];
    }

    L[n1   1] = INT_MAX;
    R[n2   1] = INT_MAX;
    i = 1;
    j = 1;

    // Merge arrays
    for (k = left; k <= right; k  ) {
        if (L[i] <= R[j]) {
            arr[k] = L[i];
            i  ;
        } else {
            arr[k] = R[j];
            j  ;
        }
    }

    free(L);
    free(R);
}


void mergeSort(int arr[], int left, int right) {
    /* Sort and then merge arrays */

    if (left < right) {
        int mid = left   (right - left) / 2;

        mergeSort(arr, left, mid);
        mergeSort(arr, mid   1, right);

        merge(arr, left, mid, right);
    }
}


void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {

    int nextThread;
    int iter = 2;
    while (iter <= threads) {
        int nextThread = (myRank 1*iter) < threads ? (myRank 1*iter) : threads;
        int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;

        printf("Merging threads %ld to %dn", myRank, nextThread);
        
        if (myRank % iter != 0) {
            break;
        }

        // barrier
        pthread_mutex_lock(amp;mutex);
        counter  ;
        int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter 1;
        if(counter >= breakpoint ) {
            counter = 0;
            pthread_cond_broadcast(amp;cond_var);
        } else {
            while (pthread_cond_wait(amp;cond_var, amp;mutex) != 0);
        }
        pthread_mutex_unlock(amp;mutex);

        merge(sortingArray, myLeft, myRight, nextThreadRight);
        sleep(2); // <- sleep

        myRight = nextThreadRight;
        iter = iter * 2;
    }

     if (myRank == 0 amp;amp; nextThread < threads-1) {
        int nextThreadRight = threads * ((float)size / (float)threads) - 1;
        merge(sortingArray, myLeft, myRight, nextThreadRight);
     }

}

void *sortManager(void *threadInfo) {
    /* Manage mergeSort between threads */

    struct ThreadTask *currentTask = threadInfo;

    // Get task arguments
    long rank = currentTask->rank;
    int left= rank * ((float)currentTask->size / (float)currentTask->threads);
    int right = (rank   1) * ((float)currentTask->size / (float)currentTask->threads) - 1;
    int mid = left   (right - left) / 2;

    // Execute merge for task division
    if (left < right) {
        mergeSort(sortingArray, left, mid);
        mergeSort(sortingArray, mid   1, right);
        merge(sortingArray, left, mid, right);
    }

    // Merge thread results
    if (rank % 2 == 0)  {
        mergeThreadResults(rank, left, right, currentTask->size, currentTask->threads);
    }

    return 0;
}


struct ThreadTask *threadCreator(int size, int threads, pthread_t *thread_handles, struct ThreadTask *tasksHolder) {
    /* Create threads with each task info */

    struct ThreadTask *threadTask;

    for (long thread = 0; thread < threads; thread  ){
        threadTask = amp;tasksHolder[thread];
        threadTask->rank = thread;
        threadTask->size = size;
        threadTask->threads = threads;

        pthread_create(amp;thread_handles[thread], NULL, sortManager, (void*) threadTask);
    }

    return tasksHolder;
}


void printArray(int arr[], int size) {
    /* Print array */

    for (int arrayIndex = 0; arrayIndex < size; arrayIndex  )
        printf("%d ", arr[arrayIndex]);
    printf("n");
}


int main(int argc, char *argv[]) {

    // Initialize arguments
    int arraySize = 20;
    int totalThreads = 16;

    
    // Display input
    printf("nInput array:n");
    printArray(sortingArray, arraySize);
    

    // Initialize threads
    pthread_t *thread_handles;
    thread_handles = malloc(totalThreads * sizeof(pthread_t));

    // Create threads
    struct ThreadTask threadTasksHolder[totalThreads];
    *threadTasksHolder = *threadCreator(arraySize, totalThreads, thread_handles, threadTasksHolder);
    
    // Execute merge sort in each thread
    for (long thread = 0; thread < totalThreads; thread  ) {
        pthread_join(thread_handles[thread], NULL);
    }
    free(thread_handles);
    

    // Display output
    printf("nSorted array:n");
    printArray(sortingArray, arraySize);
    
    return 0;
}
 

Комментарии:

1. Вы не можете использовать стандартный барьер pthread?

2. @Шон, я могу, но мне тоже не удалось заставить это работать.

Ответ №1:

Как сказал @Джон Боллинджер, ваш подход выглядит излишне сложным, и решение было бы столь же сложным. Но если вы хотите установить барьер, я бы посоветовал вам поставить его после merge входа mergeThreadResults . Таким образом, вы можете дождаться, пока все потоки, выполняющие работу в этом цикле, завершатся, прежде чем перейти к следующему.

Чтобы создать это, вам нужно будет проходить новый барьер каждую итерацию. Потому что при каждом цикле количество потоков, выполняющих слияние, будет уменьшаться. Так что начните объявлять о некоторых глобальных барьерах:

 int mergeCycleFlag = 0;
pthread_mutex_t mutex;
pthread_barrier_t *mergeBarrier;
 

Флаг используется для создания барьера для каждой итерации, и нам понадобится несколько mergeBarrier для каждого цикла. Не забудьте инициализировать его в своей main функции, указав количество итераций, которые вы будете выполнять: mergeBarrier = realloc(mergeBarrier, howManyIterations);

Тогда мы сможем создать такой барьер, как этот:

         pthread_mutex_lock(amp;mutex);
        if (mergeCycleFlag != iter) { 
            mergeCycleFlag = iter;
            int mergesInLoop = threads % iter== 0 ? threads/iter: threads/iter 1;
            pthread_barrier_init(amp;mergeBarrier[iter], NULL, mergesInLoop);
        }
        pthread_mutex_unlock(amp;mutex);

        ... MERGE ...

        // Wait everyone finnish merging
        pthread_barrier_wait (amp;mergeBarrier[iter]);
 

Обратите внимание, что я использую a lock для создания барьера, потому что мы не хотим, чтобы два потока возились здесь одновременно. Если для этого не установлен барьер iter , мы создадим его с количеством потоков, которые должны работать сейчас. Кроме того, я изменил ваше breakpoint заявление, чтобы оно соответствовало расчету того, сколько потоков мы ожидаем выполнить merge .

После некоторой корректировки, это то, как вы mergeThreadResults должны выглядеть:

 void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
    
    int nextThread, nextThreadRight;
    int groupSize = 2;

    while (groupSize <= threads) {
        if (myRank % groupSize != 0) { // Release threads that no long perform merges
            break;
        }

        nextThread = (myRank 1*groupSize) < threads ? (myRank 1*groupSize) : threads;
        nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
 
        printf("Merging threads %ld to %dn", myRank, nextThread-1);

        // Init barrier with number of threads you will wait merging 
        pthread_mutex_lock(amp;mutex);  // Just one thread can set the barrier
        if (mergeCycleFlag != groupSize) { 
            mergeCycleFlag = groupSize;
            int mergesInLoop = threads % groupSize == 0 ? threads/groupSize : threads/groupSize 1; // Calculate threads working in this step
            pthread_barrier_init(amp;mergeBarrier[groupSize], NULL, mergesInLoop);  // set barrier
        }
        pthread_mutex_unlock(amp;mutex);

        // Merge thread group with neighbour group
        merge(sortingArray, myLeft, myRight, nextThreadRight);

        // Wait everyone finnish merging
        pthread_barrier_wait (amp;mergeBarrier[groupSize]);

        myRight = nextThreadRight;
        groupSize = groupSize * 2;
    }

    // Merge thread 0
    if (myRank == 0 amp;amp; nextThread < threads-1) {
        nextThreadRight = threads * ((float)size / (float)threads) - 1;
        merge(sortingArray, myLeft, myRight, nextThreadRight);
    }
}
 

Наконец, для того, чтобы это решение работало, вам нужно, чтобы каждый поток завершил свою работу перед объединением результатов. Поэтому вам нужно либо вызвать его после вашего join входа main , либо реализовать другой барьер со всеми потоками перед вызовом mergeThreadResults on sortManager .

Кроме того, еще лучшим подходом было бы, чтобы потоки ждали только тех других потоков, которые они объединят. Например, поток 0 ожидает только 1. Затем на 2… и т. Д.

Комментарии:

1. Это сработало, спасибо! Я только что переоделся mergeBarrier[groupSize] для mergeBarrier[counter] . И реализовал счетчик для каждой итерации, чтобы выделить достаточно памяти для барьеров.

Ответ №2:

Я пытаюсь распараллелить алгоритм сортировки слиянием. То, что я делаю, — это разделяю входной массив для каждого потока, а затем объединяю результаты потоков.

Хорошо, но ваш подход излишне сложен. На каждом этапе процесса слияния вы хотите, чтобы половина ваших потоков ждала завершения другой половины, и наиболее естественный способ для одного потока дождаться завершения другого-использовать pthread_join() . Если бы вы хотели, чтобы все ваши потоки продолжали выполнять больше работы после синхронизации, это было бы по-другому, но в этом случае тем, кто больше не отвечает за слияния, вообще нечего делать.

Это то, что я пробовал:

         pthread_mutex_lock(amp;mutex);
        counter  ;
        int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter 1;
        if(counter >= breakpoint ) {
            counter = 0;
            pthread_cond_broadcast(amp;cond_var);
        } else {
            while (pthread_cond_wait(amp;cond_var, amp;mutex) != 0);
        }
        pthread_mutex_unlock(amp;mutex);
 

С этим связано несколько проблем, но самая большая заключается в том, что барьер-это неправильный инструмент для работы. После того, как барьер преодолен, все потоки, которые были заблокированы на нем, продолжаются. Вы хотите, чтобы половина потоков продолжалась, выполняя слияния, но у остальных (должно быть) больше не было работы. Ваши вычисления breakpoint предполагают, что эта вторая половина не вернется к барьеру, чего им действительно не следует делать. Если вы настаиваете на использовании барьера, то потоки, которые не должны выполнять слияние, должны завершиться после прохождения барьера.

Более того, неправильно начинать iter с 2. Если вы используете барьерный подход, то все потоки, активные на каждой итерации, должны достичь барьера до любого продолжения, но если iter начинается с 2, то на первой итерации только половина всех потоков должна достичь барьера до его прохождения.

Кроме того, использование вашего резюме неидиоматично и подвержено проблемам. Ни одна из задокументированных причин сбоя pthread_cond_wait() не может быть устранена, если вы попытаетесь снова подождать, как вы это делаете, поэтому вам, вероятно, вместо этого потребуется завершить программу по ошибке. Обратите также внимание, что pthread_mutex_lock() , pthread_mutex_unlock() , и pthread_cond_broadcast() все тоже может потерпеть неудачу.

С другой стороны, РЕЗЮМЕ подвержены (очень редким) ложным пробуждениям, поэтому при успешном возвращении после ожидания вам необходимо еще раз проверить состояние, прежде чем продолжить, и, возможно, снова подождать. Что-то более похожее на это:

         if (pthread_mutex_lock(amp;mutex) != 0) {
            perror("pthread_mutex_lock");
            abort();
        }
        counter  ;
        int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter 1;
        if(counter >= breakpoint ) {
            counter = 0;
            if (pthread_cond_broadcast(amp;cond_var) != 0) {
                perror("pthread_cond_broadcast");
                abort();
            }
        } else {
            do {
                if (pthread_cond_wait(amp;cond_var, amp;mutex) != 0) {
                    perror("pthread_cond_wait");
                    abort();
                }
            } while (counter < breakpoint);
        }
        if (pthread_mutex_unlock(amp;mutex) != 0) {
            perror("pthread_mutex_unlock");
            abort();
        }

        // some threads must terminate at this point
 

Комментарии:

1. Спасибо за ваш ответ. Я согласен, что это оказалось излишне сложным, поэтому я открыт для более чистых предложений, которые полностью отвергают то, что я делал до сих пор. Кроме того, могу я спросить, что означает «РЕЗЮМЕ»?

2. @Artotim, CV = переменная условия. И я уже упоминал о более чистом подходе: на каждом этапе межпоточного слияния каждый поток, ответственный за слияние, присоединяется к потоку, ответственному за сортировку другой половины его входных данных слияния, а те, которые больше не отвечают за слияния, завершаются. Затем основной поток присоединяется только к последнему оставшемуся потоку или даже к нему, если он сам участвует в сортировке и объединении. Никакой другой синхронизации не требуется. Тем не менее, да, начать все с нуля может быть лучшей стратегией для достижения этой цели.