#c #parallel-processing #pthreads #pthread-barriers
Вопрос:
Я пытаюсь распараллелить алгоритм сортировки слиянием. То, что я делаю, — это разделяю входной массив для каждого потока, а затем объединяю результаты потоков. То, как я пытаюсь объединить результаты, выглядит примерно так:
thread 0 | thread 1 | thread 2 | thread 3
sort(A0) | sort(A1) | sort(A2) | sort(A3)
merge(A0,A1) | | merge(A2,A3) |
merge(A0A1, A2A3) | | |
Итак, в конце моей функции sortManager
я вызываю функцию mergeThreadResults
, которая должна реализовать вышеуказанную логику. В нем я перебираю пары, чтобы объединить соответствующие потоки. Затем, при необходимости, я объединяю последние элементы в поток 0. Это выглядит так :
void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
int nextThread;
int iter = 2;
while (iter <= threads) {
int nextThread = (myRank 1*iter) < threads ? (myRank 1*iter) : threads;
int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
printf("Merging threads %ld to %dn", myRank, nextThread);
if (myRank % iter != 0) {
break;
}
merge(sortingArray, myLeft, myRight, nextThreadRight);
sleep(3); // <- sleep
myRight = nextThreadRight;
iter = iter * 2;
}
if (myRank == 0 amp;amp; nextThread < threads-1) {
int nextThreadRight = threads * ((float)size / (float)threads) - 1;
merge(sortingArray, myLeft, myRight, nextThreadRight);
}
}
Похоже, все работает так, как задумано. Проблема в том, что я использую sleep
функцию для синхронизации потоков, что далеко не лучший подход. Поэтому я пытаюсь реализовать барьер с помощью pthread.
В нем я пытаюсь рассчитать, сколько итераций потребуется для этого цикла, и передать его как breakpoint
. Когда все потоки находятся в одной точке, я отпускаю функцию слияния и снова жду в новом цикле. Это то, что я пробовал:
pthread_mutex_lock(amp;mutex);
counter ;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter 1;
if(counter >= breakpoint ) {
counter = 0;
pthread_cond_broadcast(amp;cond_var);
} else {
while (pthread_cond_wait(amp;cond_var, amp;mutex) != 0);
}
pthread_mutex_unlock(amp;mutex);
Но это работает не так, как задумывалось. Некоторые merge
триггеры до того, как последний цикл полностью закончился, оставив меня с частично отсортированным массивом.
Это незначительный пример моего кода для тестирования:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <string.h>
#include <time.h>
#include <pthread.h>
#include <unistd.h>
// Initialize global variables
int sortingArray[20] = {5,-4,3,-1,-2,3,1,2,-2,-1,-2,-1,-2,-3,4,1234,534,123,87,123};
int counter = 0;
pthread_mutex_t mutex;
pthread_cond_t cond_var;
struct ThreadTask {
long rank;
int size;
int threads;
};
void merge(int arr[], int left, int mid, int right) {
/* Merge arrays */
int i, j, k;
int n1 = mid - left 1;
int n2 = right - mid;
// Alocate temp arrays
int *L = malloc((n1 2) * sizeof(int));
int *R = malloc((n2 2) * sizeof(int));
if (L == NULL || R == NULL) {
fprintf(stderr, "Fatal: failed to allocate memory fo temp arrays.");
exit(EXIT_FAILURE);
}
// Populate temp arrays
for (i = 1; i <= n1; i ) {
L[i] = arr[left i - 1];
}
for (j = 1; j <= n2; j ) {
R[j] = arr[mid j];
}
L[n1 1] = INT_MAX;
R[n2 1] = INT_MAX;
i = 1;
j = 1;
// Merge arrays
for (k = left; k <= right; k ) {
if (L[i] <= R[j]) {
arr[k] = L[i];
i ;
} else {
arr[k] = R[j];
j ;
}
}
free(L);
free(R);
}
void mergeSort(int arr[], int left, int right) {
/* Sort and then merge arrays */
if (left < right) {
int mid = left (right - left) / 2;
mergeSort(arr, left, mid);
mergeSort(arr, mid 1, right);
merge(arr, left, mid, right);
}
}
void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
int nextThread;
int iter = 2;
while (iter <= threads) {
int nextThread = (myRank 1*iter) < threads ? (myRank 1*iter) : threads;
int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
printf("Merging threads %ld to %dn", myRank, nextThread);
if (myRank % iter != 0) {
break;
}
// barrier
pthread_mutex_lock(amp;mutex);
counter ;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter 1;
if(counter >= breakpoint ) {
counter = 0;
pthread_cond_broadcast(amp;cond_var);
} else {
while (pthread_cond_wait(amp;cond_var, amp;mutex) != 0);
}
pthread_mutex_unlock(amp;mutex);
merge(sortingArray, myLeft, myRight, nextThreadRight);
sleep(2); // <- sleep
myRight = nextThreadRight;
iter = iter * 2;
}
if (myRank == 0 amp;amp; nextThread < threads-1) {
int nextThreadRight = threads * ((float)size / (float)threads) - 1;
merge(sortingArray, myLeft, myRight, nextThreadRight);
}
}
void *sortManager(void *threadInfo) {
/* Manage mergeSort between threads */
struct ThreadTask *currentTask = threadInfo;
// Get task arguments
long rank = currentTask->rank;
int left= rank * ((float)currentTask->size / (float)currentTask->threads);
int right = (rank 1) * ((float)currentTask->size / (float)currentTask->threads) - 1;
int mid = left (right - left) / 2;
// Execute merge for task division
if (left < right) {
mergeSort(sortingArray, left, mid);
mergeSort(sortingArray, mid 1, right);
merge(sortingArray, left, mid, right);
}
// Merge thread results
if (rank % 2 == 0) {
mergeThreadResults(rank, left, right, currentTask->size, currentTask->threads);
}
return 0;
}
struct ThreadTask *threadCreator(int size, int threads, pthread_t *thread_handles, struct ThreadTask *tasksHolder) {
/* Create threads with each task info */
struct ThreadTask *threadTask;
for (long thread = 0; thread < threads; thread ){
threadTask = amp;tasksHolder[thread];
threadTask->rank = thread;
threadTask->size = size;
threadTask->threads = threads;
pthread_create(amp;thread_handles[thread], NULL, sortManager, (void*) threadTask);
}
return tasksHolder;
}
void printArray(int arr[], int size) {
/* Print array */
for (int arrayIndex = 0; arrayIndex < size; arrayIndex )
printf("%d ", arr[arrayIndex]);
printf("n");
}
int main(int argc, char *argv[]) {
// Initialize arguments
int arraySize = 20;
int totalThreads = 16;
// Display input
printf("nInput array:n");
printArray(sortingArray, arraySize);
// Initialize threads
pthread_t *thread_handles;
thread_handles = malloc(totalThreads * sizeof(pthread_t));
// Create threads
struct ThreadTask threadTasksHolder[totalThreads];
*threadTasksHolder = *threadCreator(arraySize, totalThreads, thread_handles, threadTasksHolder);
// Execute merge sort in each thread
for (long thread = 0; thread < totalThreads; thread ) {
pthread_join(thread_handles[thread], NULL);
}
free(thread_handles);
// Display output
printf("nSorted array:n");
printArray(sortingArray, arraySize);
return 0;
}
Комментарии:
1. Вы не можете использовать стандартный барьер pthread?
2. @Шон, я могу, но мне тоже не удалось заставить это работать.
Ответ №1:
Как сказал @Джон Боллинджер, ваш подход выглядит излишне сложным, и решение было бы столь же сложным. Но если вы хотите установить барьер, я бы посоветовал вам поставить его после merge
входа mergeThreadResults
. Таким образом, вы можете дождаться, пока все потоки, выполняющие работу в этом цикле, завершатся, прежде чем перейти к следующему.
Чтобы создать это, вам нужно будет проходить новый барьер каждую итерацию. Потому что при каждом цикле количество потоков, выполняющих слияние, будет уменьшаться. Так что начните объявлять о некоторых глобальных барьерах:
int mergeCycleFlag = 0;
pthread_mutex_t mutex;
pthread_barrier_t *mergeBarrier;
Флаг используется для создания барьера для каждой итерации, и нам понадобится несколько mergeBarrier для каждого цикла. Не забудьте инициализировать его в своей main
функции, указав количество итераций, которые вы будете выполнять: mergeBarrier = realloc(mergeBarrier, howManyIterations);
Тогда мы сможем создать такой барьер, как этот:
pthread_mutex_lock(amp;mutex);
if (mergeCycleFlag != iter) {
mergeCycleFlag = iter;
int mergesInLoop = threads % iter== 0 ? threads/iter: threads/iter 1;
pthread_barrier_init(amp;mergeBarrier[iter], NULL, mergesInLoop);
}
pthread_mutex_unlock(amp;mutex);
... MERGE ...
// Wait everyone finnish merging
pthread_barrier_wait (amp;mergeBarrier[iter]);
Обратите внимание, что я использую a lock
для создания барьера, потому что мы не хотим, чтобы два потока возились здесь одновременно. Если для этого не установлен барьер iter
, мы создадим его с количеством потоков, которые должны работать сейчас. Кроме того, я изменил ваше breakpoint
заявление, чтобы оно соответствовало расчету того, сколько потоков мы ожидаем выполнить merge
.
После некоторой корректировки, это то, как вы mergeThreadResults
должны выглядеть:
void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
int nextThread, nextThreadRight;
int groupSize = 2;
while (groupSize <= threads) {
if (myRank % groupSize != 0) { // Release threads that no long perform merges
break;
}
nextThread = (myRank 1*groupSize) < threads ? (myRank 1*groupSize) : threads;
nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
printf("Merging threads %ld to %dn", myRank, nextThread-1);
// Init barrier with number of threads you will wait merging
pthread_mutex_lock(amp;mutex); // Just one thread can set the barrier
if (mergeCycleFlag != groupSize) {
mergeCycleFlag = groupSize;
int mergesInLoop = threads % groupSize == 0 ? threads/groupSize : threads/groupSize 1; // Calculate threads working in this step
pthread_barrier_init(amp;mergeBarrier[groupSize], NULL, mergesInLoop); // set barrier
}
pthread_mutex_unlock(amp;mutex);
// Merge thread group with neighbour group
merge(sortingArray, myLeft, myRight, nextThreadRight);
// Wait everyone finnish merging
pthread_barrier_wait (amp;mergeBarrier[groupSize]);
myRight = nextThreadRight;
groupSize = groupSize * 2;
}
// Merge thread 0
if (myRank == 0 amp;amp; nextThread < threads-1) {
nextThreadRight = threads * ((float)size / (float)threads) - 1;
merge(sortingArray, myLeft, myRight, nextThreadRight);
}
}
Наконец, для того, чтобы это решение работало, вам нужно, чтобы каждый поток завершил свою работу перед объединением результатов. Поэтому вам нужно либо вызвать его после вашего join
входа main
, либо реализовать другой барьер со всеми потоками перед вызовом mergeThreadResults
on sortManager
.
Кроме того, еще лучшим подходом было бы, чтобы потоки ждали только тех других потоков, которые они объединят. Например, поток 0 ожидает только 1. Затем на 2… и т. Д.
Комментарии:
1. Это сработало, спасибо! Я только что переоделся
mergeBarrier[groupSize]
дляmergeBarrier[counter]
. И реализовал счетчик для каждой итерации, чтобы выделить достаточно памяти для барьеров.
Ответ №2:
Я пытаюсь распараллелить алгоритм сортировки слиянием. То, что я делаю, — это разделяю входной массив для каждого потока, а затем объединяю результаты потоков.
Хорошо, но ваш подход излишне сложен. На каждом этапе процесса слияния вы хотите, чтобы половина ваших потоков ждала завершения другой половины, и наиболее естественный способ для одного потока дождаться завершения другого-использовать pthread_join()
. Если бы вы хотели, чтобы все ваши потоки продолжали выполнять больше работы после синхронизации, это было бы по-другому, но в этом случае тем, кто больше не отвечает за слияния, вообще нечего делать.
Это то, что я пробовал:
pthread_mutex_lock(amp;mutex); counter ; int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter 1; if(counter >= breakpoint ) { counter = 0; pthread_cond_broadcast(amp;cond_var); } else { while (pthread_cond_wait(amp;cond_var, amp;mutex) != 0); } pthread_mutex_unlock(amp;mutex);
С этим связано несколько проблем, но самая большая заключается в том, что барьер-это неправильный инструмент для работы. После того, как барьер преодолен, все потоки, которые были заблокированы на нем, продолжаются. Вы хотите, чтобы половина потоков продолжалась, выполняя слияния, но у остальных (должно быть) больше не было работы. Ваши вычисления breakpoint
предполагают, что эта вторая половина не вернется к барьеру, чего им действительно не следует делать. Если вы настаиваете на использовании барьера, то потоки, которые не должны выполнять слияние, должны завершиться после прохождения барьера.
Более того, неправильно начинать iter
с 2. Если вы используете барьерный подход, то все потоки, активные на каждой итерации, должны достичь барьера до любого продолжения, но если iter
начинается с 2, то на первой итерации только половина всех потоков должна достичь барьера до его прохождения.
Кроме того, использование вашего резюме неидиоматично и подвержено проблемам. Ни одна из задокументированных причин сбоя pthread_cond_wait()
не может быть устранена, если вы попытаетесь снова подождать, как вы это делаете, поэтому вам, вероятно, вместо этого потребуется завершить программу по ошибке. Обратите также внимание, что pthread_mutex_lock()
, pthread_mutex_unlock()
, и pthread_cond_broadcast()
все тоже может потерпеть неудачу.
С другой стороны, РЕЗЮМЕ подвержены (очень редким) ложным пробуждениям, поэтому при успешном возвращении после ожидания вам необходимо еще раз проверить состояние, прежде чем продолжить, и, возможно, снова подождать. Что-то более похожее на это:
if (pthread_mutex_lock(amp;mutex) != 0) {
perror("pthread_mutex_lock");
abort();
}
counter ;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter 1;
if(counter >= breakpoint ) {
counter = 0;
if (pthread_cond_broadcast(amp;cond_var) != 0) {
perror("pthread_cond_broadcast");
abort();
}
} else {
do {
if (pthread_cond_wait(amp;cond_var, amp;mutex) != 0) {
perror("pthread_cond_wait");
abort();
}
} while (counter < breakpoint);
}
if (pthread_mutex_unlock(amp;mutex) != 0) {
perror("pthread_mutex_unlock");
abort();
}
// some threads must terminate at this point
Комментарии:
1. Спасибо за ваш ответ. Я согласен, что это оказалось излишне сложным, поэтому я открыт для более чистых предложений, которые полностью отвергают то, что я делал до сих пор. Кроме того, могу я спросить, что означает «РЕЗЮМЕ»?
2. @Artotim, CV = переменная условия. И я уже упоминал о более чистом подходе: на каждом этапе межпоточного слияния каждый поток, ответственный за слияние, присоединяется к потоку, ответственному за сортировку другой половины его входных данных слияния, а те, которые больше не отвечают за слияния, завершаются. Затем основной поток присоединяется только к последнему оставшемуся потоку или даже к нему, если он сам участвует в сортировке и объединении. Никакой другой синхронизации не требуется. Тем не менее, да, начать все с нуля может быть лучшей стратегией для достижения этой цели.