Открытие нескольких потоков с помощью объекта и возврат результата

#multithreading #oop #c 11 #std #unordered-map

#многопоточность #ооп #c 11 #std #неупорядоченный-карта

Вопрос:

Я пытаюсь открыть несколько потоков через цикл, где каждый поток является экземпляром класса, конструктор которого перегружен таким образом, он автоматически запускает нужный код, эта функция возвращает unordered_list, и я хотел бы получить его для этого конкретного экземпляра, который затем будет добавлен к окончательному unordered_list

Я пытался использовать фьючерсы и обещания, но в конечном итоге запутался, когда попытался. Этот проект призван бросить мне вызов и помочь мне освоить многопоточность на c .

     //class to be instantiated per thread   
    class WordCounter {
    public:
        std::unordered_map<std::string, int> thisWordCount;
        std::string word;

        WordCounter(std::string filepath) {}//will be overloaded
        ~WordCounter() {}//destructor

        std::unordered_map<std::string, int>operator()(std::string filepath) const {}//overloaded constructor signature
        std::unordered_map<std::string, int>operator()(std::string currentFile) {//overloaded constructor implementation
            fstream myReadFile;
            myReadFile.open(currentFile);
            if (!!!myReadFile) {
                cout << "Unable to open file";
                exit(1); // terminate with error
            }
            else if (myReadFile.is_open()) {
                while (!myReadFile.eof()) {
                    while (myReadFile >> word) {
                          thisWordCount[word];
                    }
                }
            }
            myReadFile.close();

            return thisWordCount;
        }
    };


    int main(int argc, char** argv)
    {
        std::vector<std::thread> threads;//store instantiated threads using WordCounter
        static std::unordered_map<std::string, int> finalWordCount; //append result from each thread to this unordered_list only when a particular thread finish's reading a file
        vector<string> fileName = { "input1.txt" , "input2.txt" };//filepaths to the files used

        for (int i = 0; i < fileName.size();   i)//loop through vector of filepaths to open a thread for each file to then be processed by that thread
        {
            std::string currentFile = DIR   fileName[i];
            std::thread _newThread(new WordCount(currentFile); //is this how the thread would be created?
            threads.emplace_back(_newThread);//store new thread in a vector

//I want to read through the vector when a particular thread finishes and append that particular threads result to finalWordCount

        }
  

}

Ответ №1:

Многопоточность вашего кода

Давайте начнем с написания многопоточной countWords функции. Это даст нам высокоуровневый обзор того, что должен делать код, а затем мы заполним недостающие части.

Запись countWords

countWords подсчитывает частоты слов в каждом файле в векторе имен файлов. Он делает это параллельно.

Обзор шагов:

  • Создайте вектор потоков
  • Укажите место для хранения конечного результата (это finalWordCount переменная)
  • Создайте функцию обратного вызова для WordCounter вызова to, когда это будет сделано
  • Запустите новый поток для каждого файла с WordCounter объектом.
  • Дождитесь завершения работы
  • Возврат finalWordCount

WordCounter Объект принимает имя файла в качестве входных данных при запуске потока.

Недостающие части:

  • Нам все еще нужно написать makeWordCounter функцию

Реализация:

 using std::unordered_map;
using std::string; 
using std::vector; 

unordered_map<string, int> countWords(vector<string> constamp; filenames) {
    // Create vector of threads
    vector<std::thread> threads;
    threads.reserve(filenames.size());

    // We have to have a lock because maps aren't thread safe
    std::mutex map_lock;

    // The final result goes here
    unordered_map<std::string, int> totalWordCount; 

    // Define the callback function
    // This operation is basically free
    // Internally, it just copies a reference to the mutex and a reference
    // to the totalWordCount
    auto callback = [amp;](unordered_map<string, int> constamp; partial_count) {
        // Lock the mutex so only we have access to the map
        map_lock.lock(); 
        // Update the map
        for(auto count : partial_count) {
            totalWordCount[count.first]  = count.second; 
        }
        // Unlock the mutex
        map_lock.unlock(); 
    };

    // Create a new thread for each file
    for(autoamp; file : filenames) {
        auto word_counter = makeWordCounter(callback); 
        threads.push_back(std::thread(word_counter, file)); 
    }

    // Wait until all threads have finished
    for(autoamp; thread : threads) {
        thread.join(); 
    }

    return totalWordCount; 
}
  

Writing makeWordCounter

Our function makeWordCounter is very simple: it just creates a WordCounter function that’s templated on the callback.

 template<class Callback>
WordCounter<Callback> makeWordCounter(Callback constamp; func) {
    return WordCounter<Callback>{func}; 
}
  

Написание WordCounter класса

Переменные-члены:

  • Функция обратного вызова (нам больше ничего не нужно)

Функции

  • operator() вызовы countWordsFromFilename с именем файла
  • countWordsFromFilename открывает файл, проверяет, что все в порядке, и вызывает countWords filestream
  • countWords считывает все слова в filestream и вычисляет количество, затем вызывает обратный вызов с окончательным количеством.

Поскольку WordCounter это действительно просто, я просто сделал это структурой. Ему нужно только сохранить Callback функцию, и, сделав callback функцию общедоступной, нам не нужно писать конструктор (компилятор обрабатывает его автоматически с помощью агрегированной инициализации).

 template<class Callback>
struct WordCounter {
    Callback callback;

    void operator()(std::string filename) {
        countWordsFromFilename(filename); 
    }
    void countWordsFromFilename(std::string constamp; filename) {
        std::ifstream myFile(filename);
        if (myFile) {
            countWords(myFile); 
        }
        else {
            std::cerr << "Unable to open "   filename << 'n'; 
        }
    }
    void countWords(std::ifstreamamp; filestream) {
        std::unordered_map<std::string, int> wordCount; 
        std::string word; 
        while (!filestream.eof() amp;amp; !filestream.fail()) {
            filestream >> word; 
            wordCount[word]  = 1;
        }
        callback(wordCount); 
    }
};
  

Полный код

Вы можете увидеть полный код countWords здесь: https://pastebin.com/WjFTkNYF

Единственными вещами, которые я добавил, были #include буквы s.

Обратные вызовы и шаблоны 101 (по запросу исходного плаката)

Шаблоны — это простой и полезный инструмент при написании кода. Их можно использовать для устранения взаимных зависимостей; сделать алгоритмы универсальными (чтобы их можно было использовать с любыми типами, которые вам нравятся); и они могут даже сделать код быстрее и эффективнее, позволяя вам избегать вызовов виртуальных функций-членов или указателей на функции.

Создание шаблона класса

Давайте посмотрим на действительно простой шаблон класса, представляющий пару:

 template<class First, class Second>
struct pair {
    First first;
    Second second; 
};
  

Здесь мы объявили pair как a struct , потому что мы хотим, чтобы все члены были общедоступными.

Обратите внимание, что нет ни First типа, ни Second типа.Когда мы используем имена First и Second , что мы на самом деле говорим, «в контексте pair класса, имя First будет представлять First аргумент класса pair, а имя Second будет представлять второй элемент класса pair .

Мы могли бы просто записать это как:

 // This is completely valid too
template<class A, class B>
struct pair {
    A first;
    B second; 
};
  

Использование pair довольно простое:

 int main() {
    // Create pair with an int and a string
    pair<int, std::string> myPair{14, "Hello, world!"}; 

    // Print out the first value, which is 14
    std::cout << "int value:    " << myPair.first << 'n';
    // Print out the second value, which is "Hello, world!"
    std::cout << "string value: " << myPair.second << 'n';
}
  

Как и обычный класс, pair может иметь функции-члены, конструктор, деструктор … что угодно. Поскольку pair это такой простой класс, компилятор автоматически генерирует для нас конструктор и деструктор, и нам не нужно беспокоиться о них.

Шаблонные функции

Шаблонные функции выглядят аналогично обычным функциям. Единственное отличие состоит в том, что они имеют template объявление перед остальной частью объявления функции.

Давайте напишем простую функцию для печати пары:

 template<class A, class B>
std::ostreamamp; operator<<(std::ostreamamp; stream, pair<A, B> pair) 
{
    stream << '(' << pair.first << ", " << pair.second << ')'; 
    return stream; 
}
  

Мы можем предоставить ему все pair , что захотим, если он знает, как печатать оба элемента пары:

 int main() {
    // Create pair with an int and a string
    pair<int, std::string> myPair{14, "Hello, world!"}; 

    std::cout << myPair << 'n'; 
}
  

Это выводит (14, Hello, world) .

Обратные вызовы

В Callback C нет типа. Он нам не нужен. Обратный вызов — это просто то, что вы используете, чтобы указать, что что-то произошло.

Давайте рассмотрим простой пример. Эта функция ищет все большие числа, и каждый раз, когда она находит одно, она вызывает output , что является параметром, который мы предоставили. В данном случае output это обратный вызов, и мы используем его, чтобы указать, что было найдено новое наибольшее число.

 template<class Func>
void getIncreasingNumbers(std::vector<double> constamp; nums, Func output) 
{
    // Exit if there are no numbers
    if(nums.size() == 0) 
        return; 

    double biggest = nums[0]; 
    // We always output the first one
    output(biggest); 
    for(double num : nums) 
    {
        if(num > biggest) 
        {
            biggest = num; 
            output(num); 
        }
    }
}
  

Мы можем использовать getIncreasingNumbers много разных способов. Например, мы можем отфильтровать числа, которые не были больше предыдущего:

 std::vector<double> filterNonIncreasing(std::vector<double> constamp; nums) 
{
    std::vector<double> newNums; 
    // Here, we use an amp; inside the square brackets
    // This is so we can use newNums by reference
    auto my_callback = [amp;](double val) { 
        newNums.push_back(val); 
    };
    getIncreasingNumbers(nums, my_callback); 
    return newNums; 
}
  

Или мы можем распечатать их:

 void printNonIncreasing(std::vector<double> constamp; nums) 
{
    // Here, we don't put anything in the square brackts
    // Since we don't access any local variables
    auto my_callback = [](double val) {
        std::cout << "New biggest number: " << val << 'n'; 
    };
    getIncreasingNums(nums, my_callback); 
}
  

Или мы можем найти самый большой разрыв между ними:

 double findBiggestJumpBetweenIncreasing(std::vector<double> constamp; nums)
{
    double previous; 
    double biggest_gap = 0.0; 
    bool assigned_previous = false;
    auto my_callback = [amp;](double val) {
        if(not assigned_previous) {
            previous = val; 
            assigned_previous = true;
        }
        else 
        {
            double new_gap = val - previous; 
            if(biggest_gap < new_gap) {
                biggest_gap = new_gap; 
            }
        }
    };
    getIncreasingNums(nums, my_callback); 
    return biggest_gap;
}
  

Комментарии:

1. Я не предоставил необязательный дополнительный код, поэтому int main должен возвращать 0; а currentFile — это каталог (каталог) текущий путь к файлу, который мы используем. Я думаю, мы бы передали currentPath в конструктор WordCounter. который используется в перегруженной реализации конструктора в моем классе

2. Ваш operator() перегружается в WordCounter, принимая аргументы, и должен предоставлять эти аргументы при запуске std::thread с помощью WordCounter. Кроме того, вы не можете скопировать поток; вам нужно либо переместить его в вектор с помощью std::move, либо создать его на сайте вызова (что я и сделал в приведенном выше коде)

3. Хотите, чтобы я обновил свой ответ простым примером того, как их использовать?

4. Не существует потокобезопасного способа добавления элементов на карту, поэтому вам придется использовать мьютекс. Если вы уже знали, какие слова нужно проверять, то вы могли бы заранее поместить эти слова в map, а затем вы могли бы делать что-то с помощью atomics (избегая любой блокировки), но вы не сможете добавлять новые слова в map без мьютекса. Этап синхронизации (часть, для которой должен использоваться мьютекс) обычно является узким местом параллельного кода, но вы все равно получите ускорение, если большая часть работы может быть выполнена до синхронизации (а не во время синхронизации).

5. При этом узким местом в приведенном выше коде является чтение файлов. Ваш жесткий диск может считывать только столько данных в секунду, и вполне вероятно, что даже однопоточный код может обрабатывать данные быстрее, чем они могут быть считаны с жесткого диска. Лучше всего было бы прочитать файл в одном потоке и обработать его в другом потоке.