#c #multithreading #parallel-processing #pthreads
#c #многопоточность #параллельная обработка #потоки
Вопрос:
Я пытаюсь узнать о многопоточных алгоритмах, поэтому я реализовал простую функцию поиска максимального числа массива. Я создал базовую программу (findMax1.c), которая загружает из файла около 263 миллионов чисел int в память. Затем я просто использую цикл for, чтобы найти максимальное число. Затем я создал другую программу (findMax2.c), которая использует 4 потока. Я выбрал 4 потока, потому что процессор (intel i5 4460), который я использую, имеет 4 ядра и 1 поток на ядро. Поэтому я предполагаю, что если я назначу каждому ядру кусок массива для обработки, это будет более эффективно, потому что таким образом у меня будет меньше промахов в кэше. Теперь каждый поток находит максимальное число из каждого фрагмента, затем я объединяю все потоки, чтобы, наконец, найти максимальное число из всех этих фрагментов. Базовая программа findMax1.c занимает около 660 мс для выполнения задачи, поэтому моя первоначальная мысль заключалась в том, что findMax2.c (который использует 4 потока) займет около 165 мс (660 мс / 4), так как теперь у меня есть 4 потока, запущенных параллельно для выполнения одной и той же задачи, но findMax2.c занимает около 610 мс. Всего на 50 мс меньше, чем findMax1.c. Что я упускаю? есть ли что-то не так с реализацией потоковой программы?
findMax1.c
#include lt;stdio.hgt; #include lt;stdlib.hgt; #include lt;assert.hgt; #include lt;time.hgt; int main(void) { int i, *array, max = 0, position; size_t array_size_in_bytes = 1024*1024*1024, elements_read, array_size; FILE *f; clock_t t; double time; array = (int*) malloc(array_size_in_bytes); assert(array != NULL); // assert if condition is falsa printf("Loading array..."); t = clock(); f = fopen("numbers.bin", "rb"); assert(f != NULL); elements_read = fread(array, array_size_in_bytes, 1, f); t = clock() - t; time = ((double) t) / CLOCKS_PER_SEC; assert(elements_read == 1); printf("done!n"); printf("File load time: %f [s]n", time); fclose(f); array_size = array_size_in_bytes / sizeof(int); printf("Finding max..."); t = clock(); for(i = 0; i lt; array_size; i ) if(array[i] gt; max) { max = array[i]; position = i; } t = clock() - t; time = ((double) t) / CLOCKS_PER_SEC; printf("done!n"); printf("----------- Program results -------------nMax number: %d position %dn", max, position); printf("Time %f [s]n", time); return 0; }
findMax2.c:
#define _GNU_SOURCE #include lt;stdio.hgt; #include lt;stdlib.hgt; #include lt;assert.hgt; #include lt;time.hgt; #include lt;pthread.hgt; #include lt;stdlib.hgt; #include lt;unistd.hgt; #include lt;sched.hgt; #define NUM_THREADS 4 int max_chunk[NUM_THREADS], pos_chunk[NUM_THREADS]; int *array; pthread_t tid[NUM_THREADS]; void *thread(void *arg) { size_t array_size_in_bytes = 1024*1024*1024; int i, rc, offset, chunk_size, array_size, *core_id = (int*) arg, num_cores = sysconf(_SC_NPROCESSORS_ONLN); pthread_t id = pthread_self(); cpu_set_t cpuset; if (*core_id lt; 0 || *core_id gt;= num_cores) return NULL; CPU_ZERO(amp;cpuset); CPU_SET(*core_id, amp;cpuset); rc = pthread_setaffinity_np(id, sizeof(cpu_set_t), amp;cpuset); if(rc != 0) { printf("pthread_setaffinity_np() failed! - rc %dn", rc); return NULL; } printf("Thread running on CPU %dn", sched_getcpu()); array_size = (int) (array_size_in_bytes / sizeof(int)); chunk_size = (int) (array_size / NUM_THREADS); offset = chunk_size * (*core_id); // Find max number in the array chunk for(i = offset; i lt; (offset chunk_size); i ) { if(array[i] gt; max_chunk[*core_id]) { max_chunk[*core_id] = array[i]; pos_chunk[*core_id] = i; } } return NULL; } void load_array(void) { FILE *f; size_t array_size_in_bytes = 1024*1024*1024, elements_read; array = (int*) malloc(array_size_in_bytes); assert(array != NULL); // assert if condition is false printf("Loading array..."); f = fopen("numbers.bin", "rb"); assert(f != NULL); elements_read = fread(array, array_size_in_bytes, 1, f); assert(elements_read == 1); printf("done!n"); fclose(f); } int main(void) { int i, max = 0, position, id[NUM_THREADS], rc; clock_t t; double time; load_array(); printf("Finding max..."); t = clock(); // Create threads for(i = 0; i lt; NUM_THREADS; i ) { id[i] = i; // uso id para pasarle un puntero distinto a cada thread rc = pthread_create(amp;(tid[i]), NULL, amp;thread, (void*)(id i)); if (rc != 0) printf("Can't create thread! rc = %dn", rc); else printf("Thread %lu createdn", tid[i]); } // Join threads for(i = 0; i lt; NUM_THREADS; i ) pthread_join(tid[i], NULL); // Find max number from all chunks for(i = 0; i lt; NUM_THREADS; i ) if(max_chunk[i] gt; max) { max = max_chunk[i]; position = pos_chunk[i]; } t = clock() - t; time = ((double) t) / CLOCKS_PER_SEC; printf("done!n"); free(array); printf("----------- Program results -------------nMax number: %d position %dn", max, position); printf("Time %f [s]n", time); pthread_exit(NULL); return 0; }
Комментарии:
1. Ваш файл действительно 1 гигабайт? Вы можете использовать
fseek(f, 0, SEEK_END); size_t filesize = ftell(f); rewind(f);
для выделения памяти в зависимости от размера файла.2. Да, я сделал действительно большой файл, так что мне нужно обработать много цифр. Таким образом, я мог бы измерить время, необходимое для обработки всех чисел. Размер файла не меняется, поэтому я жестко закодировал размер массива.
3. @Barmak Shemirani Доступ к диску не включен во время
4. Совет:
mmap
это позволит быстрее загрузить файл.5. Существуют накладные расходы на создание потоков, и потоки выполняются не очень долго.
Ответ №1:
Во-первых, вы неправильно измеряете свое время. clock()
измеряет процессорное время процесса, т. е. время, используемое всеми потоками. Реальное затраченное время будет частью этого. clock_gettime(CLOCK_MONOTONIC,...)
должно дать лучшие измерения.
Во-вторых, ваши циклы ядра вообще несопоставимы.
В многопоточной программе, которую вы пишете на каждой итерации цикла, глобальные переменные очень близки друг к другу, и это ужасно для конкуренции в кэше. Вы могли пространства глобальной памяти отдельно (сделать каждого элемента массива кэш-соответствие структуры (struct _Alignas(64)
)) и это поможет скоротать время, но лучше подходом было бы использовать локальные переменные (которые должны идти в регистры), копирование приближение первой петли, а затем выписать фрагмент результата в память в конце цикла:
int l_max_chunk=0, l_pos_chunk=0, *a; for(i = 0,a=array offset; i lt; chunk_size; i ) if(a[i] gt; l_max_chunk) l_max_chunk=a[i], l_pos_chunk=i; max_chunk[*core_id] = l_max_chunk; pos_chunk[*core_id] = l_pos_chunk;
Вот ваша модифицированная тестовая программа с ожидаемым ускорением (я получаю примерно 2-кратное ускорение на своем двухъядерном процессоре). (Я также взял на себя смелость заменить загрузку файлов инициализацией в памяти, чтобы упростить тестирование.)
#define _GNU_SOURCE #include lt;stdio.hgt; #include lt;stdlib.hgt; #include lt;assert.hgt; #include lt;time.hgt; #include lt;pthread.hgt; #include lt;stdlib.hgt; #include lt;unistd.hgt; #include lt;sched.hgt; #include lt;stdint.hgt; struct timespec ts0,ts1; uint64_t sc_timespec_diff(struct timespec Ts1, struct timespec Ts0) { return (Ts1.tv_sec - Ts0.tv_sec)*1000000000 (Ts1.tv_nsec - Ts0.tv_nsec); } #define NUM_THREADS 4 int max_chunk[NUM_THREADS], pos_chunk[NUM_THREADS]; int *array; pthread_t tid[NUM_THREADS]; void *thread(void *arg) { size_t array_size_in_bytes = 1024*1024*1024; int i, rc, offset, chunk_size, array_size, *core_id = (int*) arg, num_cores = sysconf(_SC_NPROCESSORS_ONLN); #if 1 //shouldn't make much difference pthread_t id = pthread_self(); cpu_set_t cpuset; if (*core_id lt; 0 || *core_id gt;= num_cores) return NULL; CPU_ZERO(amp;cpuset); CPU_SET(*core_id, amp;cpuset); rc = pthread_setaffinity_np(id, sizeof(cpu_set_t), amp;cpuset); if(rc != 0) { printf("pthread_setaffinity_np() failed! - rc %dn", rc); return NULL; } printf("Thread running on CPU %dn", sched_getcpu()); #endif array_size = (int) (array_size_in_bytes / sizeof(int)); chunk_size = (int) (array_size / NUM_THREADS); offset = chunk_size * (*core_id); // Find max number in the array chunk #if 0 //horrible for caches for(i = offset; i lt; (offset chunk_size); i ) { if(array[i] gt; max_chunk[*core_id]) { max_chunk[*core_id] = array[i]; pos_chunk[*core_id] = i; } } #else int l_max_chunk=0, l_pos_chunk=0, *a; for(i = 0,a=array offset; i lt; chunk_size; i ) if(a[i] gt; l_max_chunk) l_max_chunk=a[i], l_pos_chunk=i; max_chunk[*core_id] = l_max_chunk; pos_chunk[*core_id] = l_pos_chunk; #endif return NULL; } void load_array(void) { FILE *f; size_t array_size_in_bytes = 1024*1024*1024, array_size=array_size_in_bytes/sizeof(int); array = (int*) malloc(array_size_in_bytes); if(array == NULL) abort(); // assert if condition is false for(size_t i=0; ilt;array_size; i ) array[i]=i; } int main(void) { int i, max = 0, position, id[NUM_THREADS], rc; clock_t t; double time; load_array(); printf("Finding max..."); t = clock(); clock_gettime(CLOCK_MONOTONIC,amp;ts0); // Create threads for(i = 0; i lt; NUM_THREADS; i ) { id[i] = i; // uso id para pasarle un puntero distinto a cada thread rc = pthread_create(amp;(tid[i]), NULL, amp;thread, (void*)(id i)); if (rc != 0) printf("Can't create thread! rc = %dn", rc); else printf("Thread %lu createdn", tid[i]); } // Join threads for(i = 0; i lt; NUM_THREADS; i ) pthread_join(tid[i], NULL); // Find max number from all chunks for(i = 0; i lt; NUM_THREADS; i ) if(max_chunk[i] gt; max) { max = max_chunk[i]; position = pos_chunk[i]; } clock_gettime(CLOCK_MONOTONIC,amp;ts1); printf("Time2 %.6LFn", sc_timespec_diff(ts1,ts0)/1E9L); t = clock() - t; time = ((double) t) / CLOCKS_PER_SEC; printf("done!n"); free(array); printf("----------- Program results -------------nMax number: %d position %dn", max, position); printf("Time %f [s]n", time); pthread_exit(NULL); return 0; }
Мои тайминги:
- 0.188917 для версии с резьбой signle
- 2.511590 для оригинальной многопоточной версии (измеряется с помощью clock_gettime(CLOCK_MONOTONIC,…)
- 0.099802 с измененной резьбовой версией (измеряется с помощью clock_gettime(CLOCK_MONOTONIC,…)
запущен на машине Linux с процессором Intel(R) Core(TM) i7-2620M с частотой 2,70 ГГц.