#c #c 11 #concurrency #parallel-processing #mutex
#c #c 11 #параллелизм #параллельная обработка #мьютекс
Вопрос:
Рассмотрим следующий код :
// Preprocessor
#include <iostream>
#include <chrono>
#include <thread>
#include <algorithm>
#include <mutex>
#include <random>
// Main function
int main()
{
// A random vector of size 100 with 10 different random values
std::vector<unsigned int> vector = make_random_vector(100, 10);
// At the end, the result should be the 10 different random values
std::vector<unsigned int> resu<
// Mutex to deals with concurrency
std::mutex mutex;
// Parallel search
parallel_for_each(vector.begin(), vector.end(),
[=, amp;result, amp;mutex](const unsigned intamp; i){
/* CRITICAL SECTION: BEGIN */
// If the current element is not yet in the resulting vector, inserts it
if (!std::binary_search(result.begin(), result.end(), i)) {
mutex.lock();
result.insert(std::lower_bound(result.begin(), result.end(), i), i);
mutex.unlock();
}
/* CRITICAL SECTION: END */
});
// Unique values
result.erase(std::unique(result.begin(), result.end()), result.end());
// Display the result
std::for_each(result.begin(), result.end(),
[](const unsigned intamp; i){
std::cout<<i<<std::endl;
});
// Finalization
return 0;
}
Цель состоит в том, чтобы параллельно найти n различных значений в векторе.
Мой вопрос: в порядке ли предыдущий код (нет проблем с параллелизмом), и если нет, то как его исправить?
Примечание: в этом коде есть вызовы двух функций :
parallel_for_each
который выполняет предоставленную функцию для указанного количества потоков :
// Parallel execution returning the execution time in seconds
template <class Iterator, class Function>
double parallel_for_each(const Iteratoramp; first, const Iteratoramp; last, Functionamp;amp; function, const int nthreads = std::thread::hardware_concurrency())
{
const std::chrono::high_resolution_clock::time_point tbegin = std::chrono::high_resolution_clock::now();
const long long int ntasks = std::max(static_cast<int>(1), nthreads);
const long long int group = std::max(static_cast<long long int>(first < last), static_cast<long long int>((last-first)/ntasks));
std::vector<std::thread> threads;
Iterator it = first;
threads.reserve(ntasks);
for (it = first; it < last-group; it = group) {
threads.push_back(std::thread([=, amp;last, amp;group, amp;function](){std::for_each(it, std::min(it group, last), function);}));
}
std::for_each(it, last, function);
std::for_each(threads.begin(), threads.end(), [](std::threadamp; current){current.join();});
return std::chrono::duration_cast<std::chrono::duration<double> >(std::chrono::high_resolution_clock::now()-tbegin).count();
}
make_random_vector
который создает случайный вектор элементов с nзначениями разных случайных значений
// Produces a random vector of nelements with nvalues different random values
std::vector<unsigned int> make_random_vector(const unsigned int nelements, const unsigned int nvalues)
{
std::vector<unsigned int> vector(nelements);
std::vector<unsigned int> values(nvalues);
std::random_device device;
std::mt19937 engine(device());
std::uniform_int_distribution<unsigned int> distribution1;
std::uniform_int_distribution<unsigned int> distribution2(0, nvalues-1);
std::for_each(values.begin(), values.end(), [=, amp;distribution1, amp;engine](unsigned intamp; i){i = distribution1(engine);});
std::for_each(vector.begin(), vector.end(), [=, amp;distribution2, amp;engine, amp;values](unsigned intamp; i){i = values[distribution2(engine)];});
return vector;
}
Комментарии:
1. Если
result.insert
вызывает перераспределение вектора, в то время как другой поток выполняет abinary_search
над ним, произойдут плохие вещи. Я чувствую, что единственный способ сделать это — заставить каждую задачу возвращать вектор результатов, а затем объединять результаты из каждого потока.2. Если вектор намного больше размера кэша, то поиск будет ограничен пропускной способностью памяти, и параллельный подход может не сильно помочь. Параллельный подход поможет, если у процесса ограничена пропускная способность процессора или когда большинство операций выполняется в локальном кэше для каждого ядра (обратите внимание, что внешний кэш разделяется между ядрами на некоторых процессорах).
Ответ №1:
В вашем коде есть проблема, поскольку вы защищаете только одновременный доступ на запись, но не доступ на чтение result
.
Решением было бы переместить блокировку мьютекса за пределы if
следующего:
[=, amp;result, amp;mutex](const unsigned intamp; i){
std::lock_guard<std::mutex> lck (mutex);
// If the current element is not yet in the resulting vector, inserts it
if (!std::binary_search(result.begin(), result.end(), i)) {
result.insert(std::lower_bound(result.begin(), result.end(), i), i);
}
}
но это нарушит цель параллели для :/
Другим решением было бы работать с другим набором результатов и присоединять результат в конце цикла.
Другим решением может быть вариант блокировки с двойной проверкой, но требует копирования result
при каждой вставке.
Ответ №2:
Вместо использования std::vector<unsigned int>
use Concurrency::combinable<std::vector<unsigned int>> result
. Это позволяет иметь локальную копию результата в потоке, и вам не нужны никакие мьютексы.
После завершения parallel_for_each
использования combine_each
и поместите результат в a std::set<unsigned int>
, чтобы получить уникальные значения или любой другой способ, который вы считаете подходящим.
РЕДАКТИРОВАТЬ: приведенный ниже подход не требует std::mutex .
#include <ppl.h>
void print_unqiue_numbers()
{
using namespace Concurrency;
std::vector<unsigned int> vector = make_random_vector(100, 10);
// At the end, the result should be the 10 different random values
combinable<std::vector<unsigned int>> resu<
// Parallel search
parallel_for_each(vector.begin(), vector.end(),
[=, amp;result](const unsigned intamp; i){
autoamp; local_result = result.local(); // thread local variable.
if (!std::binary_search(local_result.begin(), local_result.end(), i)) {
local_result.insert(std::lower_bound(local_result.begin(),
local_result.end(), i), i);
}
});
std::set<unsigned int> unique_values;
result.combine_each([amp;](std::vector<unsigned int> constamp; values)
{
for(auto v : values)
{
unique_values.insert(v);
}
});
std::cout << "print the unique valuesn";
for (auto v : unique_values)
{
std::cout << v << 'n';
}
}