Параллельный поиск различных значений?

#c #c 11 #concurrency #parallel-processing #mutex

#c #c 11 #параллелизм #параллельная обработка #мьютекс

Вопрос:

Рассмотрим следующий код :

 // Preprocessor
#include <iostream>
#include <chrono>
#include <thread>
#include <algorithm>
#include <mutex>
#include <random>

// Main function
int main()
{
    // A random vector of size 100 with 10 different random values
    std::vector<unsigned int> vector = make_random_vector(100, 10);
    // At the end, the result should be the 10 different random values
    std::vector<unsigned int> resu<
    // Mutex to deals with concurrency
    std::mutex mutex;
    // Parallel search
    parallel_for_each(vector.begin(), vector.end(), 
    [=, amp;result, amp;mutex](const unsigned intamp; i){
       /* CRITICAL SECTION: BEGIN */
       // If the current element is not yet in the resulting vector, inserts it
       if (!std::binary_search(result.begin(), result.end(), i)) {
           mutex.lock();
           result.insert(std::lower_bound(result.begin(), result.end(), i), i);
           mutex.unlock();
       }
       /* CRITICAL SECTION: END */
    });
    // Unique values
    result.erase(std::unique(result.begin(), result.end()), result.end());
    // Display the result
    std::for_each(result.begin(), result.end(), 
    [](const unsigned intamp; i){
        std::cout<<i<<std::endl;
    });
    // Finalization
    return 0;
}
  

Цель состоит в том, чтобы параллельно найти n различных значений в векторе.

Мой вопрос: в порядке ли предыдущий код (нет проблем с параллелизмом), и если нет, то как его исправить?


Примечание: в этом коде есть вызовы двух функций :

parallel_for_each который выполняет предоставленную функцию для указанного количества потоков :

 // Parallel execution returning the execution time in seconds
template <class Iterator, class Function> 
double parallel_for_each(const Iteratoramp; first, const Iteratoramp; last, Functionamp;amp; function, const int nthreads = std::thread::hardware_concurrency())
{
    const std::chrono::high_resolution_clock::time_point tbegin = std::chrono::high_resolution_clock::now();
    const long long int ntasks = std::max(static_cast<int>(1), nthreads);
    const long long int group = std::max(static_cast<long long int>(first < last), static_cast<long long int>((last-first)/ntasks));
    std::vector<std::thread> threads;
    Iterator it = first;
    threads.reserve(ntasks);
    for (it = first; it < last-group; it  = group) {
        threads.push_back(std::thread([=, amp;last, amp;group, amp;function](){std::for_each(it, std::min(it group, last), function);}));
    }
    std::for_each(it, last, function);
    std::for_each(threads.begin(), threads.end(), [](std::threadamp; current){current.join();});
    return std::chrono::duration_cast<std::chrono::duration<double> >(std::chrono::high_resolution_clock::now()-tbegin).count();
}
  

make_random_vector который создает случайный вектор элементов с nзначениями разных случайных значений

 // Produces a random vector of nelements with nvalues different random values
std::vector<unsigned int> make_random_vector(const unsigned int nelements, const unsigned int nvalues)
{
    std::vector<unsigned int> vector(nelements);
    std::vector<unsigned int> values(nvalues);
    std::random_device device;
    std::mt19937 engine(device());
    std::uniform_int_distribution<unsigned int> distribution1;
    std::uniform_int_distribution<unsigned int> distribution2(0, nvalues-1);
    std::for_each(values.begin(), values.end(), [=, amp;distribution1, amp;engine](unsigned intamp; i){i = distribution1(engine);});
    std::for_each(vector.begin(), vector.end(), [=, amp;distribution2, amp;engine, amp;values](unsigned intamp; i){i = values[distribution2(engine)];});
    return vector;
}
  

Комментарии:

1. Если result.insert вызывает перераспределение вектора, в то время как другой поток выполняет a binary_search над ним, произойдут плохие вещи. Я чувствую, что единственный способ сделать это — заставить каждую задачу возвращать вектор результатов, а затем объединять результаты из каждого потока.

2. Если вектор намного больше размера кэша, то поиск будет ограничен пропускной способностью памяти, и параллельный подход может не сильно помочь. Параллельный подход поможет, если у процесса ограничена пропускная способность процессора или когда большинство операций выполняется в локальном кэше для каждого ядра (обратите внимание, что внешний кэш разделяется между ядрами на некоторых процессорах).

Ответ №1:

В вашем коде есть проблема, поскольку вы защищаете только одновременный доступ на запись, но не доступ на чтение result .

Решением было бы переместить блокировку мьютекса за пределы if следующего:

 [=, amp;result, amp;mutex](const unsigned intamp; i){
    std::lock_guard<std::mutex> lck (mutex);

    // If the current element is not yet in the resulting vector, inserts it
    if (!std::binary_search(result.begin(), result.end(), i)) {
        result.insert(std::lower_bound(result.begin(), result.end(), i), i);
    }
}
  

но это нарушит цель параллели для :/

Другим решением было бы работать с другим набором результатов и присоединять результат в конце цикла.

Другим решением может быть вариант блокировки с двойной проверкой, но требует копирования result при каждой вставке.

Ответ №2:

Вместо использования std::vector<unsigned int> use Concurrency::combinable<std::vector<unsigned int>> result . Это позволяет иметь локальную копию результата в потоке, и вам не нужны никакие мьютексы.

После завершения parallel_for_each использования combine_each и поместите результат в a std::set<unsigned int> , чтобы получить уникальные значения или любой другой способ, который вы считаете подходящим.

РЕДАКТИРОВАТЬ: приведенный ниже подход не требует std::mutex .

  #include <ppl.h>

 void print_unqiue_numbers()
 {
    using namespace Concurrency;
    std::vector<unsigned int> vector = make_random_vector(100, 10);
    // At the end, the result should be the 10 different random values
    combinable<std::vector<unsigned int>> resu<
    // Parallel search
    parallel_for_each(vector.begin(), vector.end(), 
    [=, amp;result](const unsigned intamp; i){
       autoamp; local_result = result.local(); // thread local variable.
       if (!std::binary_search(local_result.begin(), local_result.end(), i)) {
           local_result.insert(std::lower_bound(local_result.begin(),
                                                local_result.end(), i), i);
       }
    });

     std::set<unsigned int> unique_values;
     result.combine_each([amp;](std::vector<unsigned int> constamp; values)
     {
        for(auto v : values)
        {
            unique_values.insert(v);
        }
     });

     std::cout << "print the unique valuesn";
     for (auto v : unique_values)
     {
         std::cout << v << 'n';
     }
  }