Производительность многопоточного алгоритма для нахождения максимального числа в массиве

#c #multithreading #parallel-processing #pthreads

#c #многопоточность #параллельная обработка #потоки

Вопрос:

Я пытаюсь узнать о многопоточных алгоритмах, поэтому я реализовал простую функцию поиска максимального числа массива. Я создал базовую программу (findMax1.c), которая загружает из файла около 263 миллионов чисел int в память. Затем я просто использую цикл for, чтобы найти максимальное число. Затем я создал другую программу (findMax2.c), которая использует 4 потока. Я выбрал 4 потока, потому что процессор (intel i5 4460), который я использую, имеет 4 ядра и 1 поток на ядро. Поэтому я предполагаю, что если я назначу каждому ядру кусок массива для обработки, это будет более эффективно, потому что таким образом у меня будет меньше промахов в кэше. Теперь каждый поток находит максимальное число из каждого фрагмента, затем я объединяю все потоки, чтобы, наконец, найти максимальное число из всех этих фрагментов. Базовая программа findMax1.c занимает около 660 мс для выполнения задачи, поэтому моя первоначальная мысль заключалась в том, что findMax2.c (который использует 4 потока) займет около 165 мс (660 мс / 4), так как теперь у меня есть 4 потока, запущенных параллельно для выполнения одной и той же задачи, но findMax2.c занимает около 610 мс. Всего на 50 мс меньше, чем findMax1.c. Что я упускаю? есть ли что-то не так с реализацией потоковой программы?

findMax1.c

 #include lt;stdio.hgt; #include lt;stdlib.hgt; #include lt;assert.hgt; #include lt;time.hgt;  int main(void) {  int i, *array, max = 0, position;  size_t array_size_in_bytes = 1024*1024*1024, elements_read, array_size;  FILE *f;  clock_t t;  double time;   array = (int*) malloc(array_size_in_bytes);  assert(array != NULL); // assert if condition is falsa    printf("Loading array...");   t = clock();  f = fopen("numbers.bin", "rb");  assert(f != NULL);   elements_read = fread(array, array_size_in_bytes, 1, f);  t = clock() - t;  time = ((double) t) / CLOCKS_PER_SEC;  assert(elements_read == 1);   printf("done!n");  printf("File load time: %f [s]n", time);   fclose(f);   array_size = array_size_in_bytes / sizeof(int);   printf("Finding max...");   t = clock();   for(i = 0; i lt; array_size; i  )  if(array[i] gt; max)  {  max = array[i];  position = i;  }   t = clock() - t;  time = ((double) t) / CLOCKS_PER_SEC;   printf("done!n");  printf("----------- Program results -------------nMax number: %d position %dn", max, position);  printf("Time %f [s]n", time);   return 0; }  

findMax2.c:

 #define _GNU_SOURCE #include lt;stdio.hgt; #include lt;stdlib.hgt; #include lt;assert.hgt; #include lt;time.hgt; #include lt;pthread.hgt; #include lt;stdlib.hgt; #include lt;unistd.hgt; #include lt;sched.hgt;  #define NUM_THREADS 4  int max_chunk[NUM_THREADS], pos_chunk[NUM_THREADS]; int *array; pthread_t tid[NUM_THREADS];  void *thread(void *arg) {  size_t array_size_in_bytes = 1024*1024*1024;  int i, rc, offset, chunk_size, array_size, *core_id = (int*) arg, num_cores = sysconf(_SC_NPROCESSORS_ONLN);  pthread_t id = pthread_self();  cpu_set_t cpuset;   if (*core_id lt; 0 || *core_id gt;= num_cores)  return NULL;   CPU_ZERO(amp;cpuset);  CPU_SET(*core_id, amp;cpuset);   rc = pthread_setaffinity_np(id, sizeof(cpu_set_t), amp;cpuset);  if(rc != 0)  {  printf("pthread_setaffinity_np() failed! - rc %dn", rc);  return NULL;  }   printf("Thread running on CPU %dn", sched_getcpu());    array_size = (int) (array_size_in_bytes / sizeof(int));  chunk_size = (int) (array_size / NUM_THREADS);  offset = chunk_size * (*core_id);    // Find max number in the array chunk  for(i = offset; i lt; (offset   chunk_size); i  )  {  if(array[i] gt; max_chunk[*core_id])  {  max_chunk[*core_id] = array[i];  pos_chunk[*core_id] = i;  }  }    return NULL;  }  void load_array(void) {  FILE *f;  size_t array_size_in_bytes = 1024*1024*1024, elements_read;   array = (int*) malloc(array_size_in_bytes);  assert(array != NULL); // assert if condition is false   printf("Loading array...");   f = fopen("numbers.bin", "rb");  assert(f != NULL);   elements_read = fread(array, array_size_in_bytes, 1, f);  assert(elements_read == 1);   printf("done!n");   fclose(f); }  int main(void) {  int i, max = 0, position, id[NUM_THREADS], rc;  clock_t t;  double time;   load_array();   printf("Finding max...");   t = clock();   // Create threads  for(i = 0; i lt; NUM_THREADS; i  )  {  id[i] = i; // uso id para pasarle un puntero distinto a cada thread  rc = pthread_create(amp;(tid[i]), NULL, amp;thread, (void*)(id   i));  if (rc != 0)  printf("Can't create thread! rc = %dn", rc);  else  printf("Thread %lu createdn", tid[i]);  }    // Join threads  for(i = 0; i lt; NUM_THREADS; i  )  pthread_join(tid[i], NULL);   // Find max number from all chunks  for(i = 0; i lt; NUM_THREADS; i  )  if(max_chunk[i] gt; max)  {  max = max_chunk[i];  position = pos_chunk[i];  }   t = clock() - t;  time = ((double) t) / CLOCKS_PER_SEC;   printf("done!n");  free(array);   printf("----------- Program results -------------nMax number: %d position %dn", max, position);  printf("Time %f [s]n", time);   pthread_exit(NULL);   return 0; }  

Комментарии:

1. Ваш файл действительно 1 гигабайт? Вы можете использовать fseek(f, 0, SEEK_END); size_t filesize = ftell(f); rewind(f); для выделения памяти в зависимости от размера файла.

2. Да, я сделал действительно большой файл, так что мне нужно обработать много цифр. Таким образом, я мог бы измерить время, необходимое для обработки всех чисел. Размер файла не меняется, поэтому я жестко закодировал размер массива.

3. @Barmak Shemirani Доступ к диску не включен во время

4. Совет: mmap это позволит быстрее загрузить файл.

5. Существуют накладные расходы на создание потоков, и потоки выполняются не очень долго.

Ответ №1:

Во-первых, вы неправильно измеряете свое время. clock() измеряет процессорное время процесса, т. е. время, используемое всеми потоками. Реальное затраченное время будет частью этого. clock_gettime(CLOCK_MONOTONIC,...) должно дать лучшие измерения.

Во-вторых, ваши циклы ядра вообще несопоставимы.

В многопоточной программе, которую вы пишете на каждой итерации цикла, глобальные переменные очень близки друг к другу, и это ужасно для конкуренции в кэше. Вы могли пространства глобальной памяти отдельно (сделать каждого элемента массива кэш-соответствие структуры (struct _Alignas(64) )) и это поможет скоротать время, но лучше подходом было бы использовать локальные переменные (которые должны идти в регистры), копирование приближение первой петли, а затем выписать фрагмент результата в память в конце цикла:

 int l_max_chunk=0, l_pos_chunk=0, *a; for(i = 0,a=array offset; i lt; chunk_size; i  )  if(a[i] gt; l_max_chunk) l_max_chunk=a[i], l_pos_chunk=i; max_chunk[*core_id] = l_max_chunk; pos_chunk[*core_id] = l_pos_chunk;  

Вот ваша модифицированная тестовая программа с ожидаемым ускорением (я получаю примерно 2-кратное ускорение на своем двухъядерном процессоре). (Я также взял на себя смелость заменить загрузку файлов инициализацией в памяти, чтобы упростить тестирование.)

 #define _GNU_SOURCE #include lt;stdio.hgt; #include lt;stdlib.hgt; #include lt;assert.hgt; #include lt;time.hgt; #include lt;pthread.hgt; #include lt;stdlib.hgt; #include lt;unistd.hgt; #include lt;sched.hgt;  #include lt;stdint.hgt; struct timespec ts0,ts1; uint64_t sc_timespec_diff(struct timespec Ts1, struct timespec Ts0) { return (Ts1.tv_sec - Ts0.tv_sec)*1000000000 (Ts1.tv_nsec - Ts0.tv_nsec); }  #define NUM_THREADS 4  int max_chunk[NUM_THREADS], pos_chunk[NUM_THREADS]; int *array; pthread_t tid[NUM_THREADS];  void *thread(void *arg) {  size_t array_size_in_bytes = 1024*1024*1024;  int i, rc, offset, chunk_size, array_size, *core_id = (int*) arg, num_cores = sysconf(_SC_NPROCESSORS_ONLN);  #if 1 //shouldn't make much difference  pthread_t id = pthread_self();  cpu_set_t cpuset;   if (*core_id lt; 0 || *core_id gt;= num_cores)  return NULL;   CPU_ZERO(amp;cpuset);  CPU_SET(*core_id, amp;cpuset);   rc = pthread_setaffinity_np(id, sizeof(cpu_set_t), amp;cpuset);  if(rc != 0)  {  printf("pthread_setaffinity_np() failed! - rc %dn", rc);  return NULL;  }   printf("Thread running on CPU %dn", sched_getcpu());  #endif    array_size = (int) (array_size_in_bytes / sizeof(int));  chunk_size = (int) (array_size / NUM_THREADS);  offset = chunk_size * (*core_id);    // Find max number in the array chunk    #if 0 //horrible for caches  for(i = offset; i lt; (offset   chunk_size); i  )  {  if(array[i] gt; max_chunk[*core_id])  {  max_chunk[*core_id] = array[i];  pos_chunk[*core_id] = i;  }  }  #else  int l_max_chunk=0, l_pos_chunk=0, *a;  for(i = 0,a=array offset; i lt; chunk_size; i  )  if(a[i] gt; l_max_chunk) l_max_chunk=a[i], l_pos_chunk=i;  max_chunk[*core_id] = l_max_chunk;  pos_chunk[*core_id] = l_pos_chunk;  #endif    return NULL;  }  void load_array(void) {  FILE *f;  size_t array_size_in_bytes = 1024*1024*1024, array_size=array_size_in_bytes/sizeof(int);   array = (int*) malloc(array_size_in_bytes);  if(array == NULL) abort(); // assert if condition is false  for(size_t i=0; ilt;array_size; i  ) array[i]=i;  }   int main(void) {  int i, max = 0, position, id[NUM_THREADS], rc;  clock_t t;  double time;   load_array();   printf("Finding max...");   t = clock();   clock_gettime(CLOCK_MONOTONIC,amp;ts0);   // Create threads  for(i = 0; i lt; NUM_THREADS; i  )  {  id[i] = i; // uso id para pasarle un puntero distinto a cada thread  rc = pthread_create(amp;(tid[i]), NULL, amp;thread, (void*)(id   i));  if (rc != 0)  printf("Can't create thread! rc = %dn", rc);  else  printf("Thread %lu createdn", tid[i]);  }    // Join threads  for(i = 0; i lt; NUM_THREADS; i  )  pthread_join(tid[i], NULL);   // Find max number from all chunks  for(i = 0; i lt; NUM_THREADS; i  )  if(max_chunk[i] gt; max)  {  max = max_chunk[i];  position = pos_chunk[i];  }   clock_gettime(CLOCK_MONOTONIC,amp;ts1);  printf("Time2 %.6LFn", sc_timespec_diff(ts1,ts0)/1E9L);   t = clock() - t;  time = ((double) t) / CLOCKS_PER_SEC;   printf("done!n");  free(array);   printf("----------- Program results -------------nMax number: %d position %dn", max, position);  printf("Time %f [s]n", time);   pthread_exit(NULL);   return 0; }  

Мои тайминги:

  • 0.188917 для версии с резьбой signle
  • 2.511590 для оригинальной многопоточной версии (измеряется с помощью clock_gettime(CLOCK_MONOTONIC,…)
  • 0.099802 с измененной резьбовой версией (измеряется с помощью clock_gettime(CLOCK_MONOTONIC,…)

запущен на машине Linux с процессором Intel(R) Core(TM) i7-2620M с частотой 2,70 ГГц.