Сохранение данных при выполнении многопоточной обработки

#c #multithreading #concurrency #data-integrity #data-consistency

#c #многопоточность #параллелизм #целостность данных #согласованность данных

Вопрос:

Существует приложение-загрузчик, которое выполняет различные виды обработки элементов загрузки в нескольких потоках. Некоторые потоки анализируют входные данные, некоторые выполняют загрузку, извлечение, сохранение состояния и т. Д. Таким образом, каждый тип потока работает с определенными элементами данных, и некоторые из этих потоков могут выполняться одновременно. Элемент загрузки может быть описан следующим образом:

 class File;

class Download
{
public:
    enum State
    {
        Parsing, Downloading, Extracting, Repairing, Finished
    };

    Download(const std::string amp;filePath): filePath(filePath) { }

    void save()
    {
        // TODO: save data consistently

        StateFile f;    // state file for this download

        // save general download parameters
        f << filePath << state << bytesWritten << totalFiles << processedFiles;

        // Now we are to save the parameters of the files which belong to this download,
        // (!) but assume the downloading thread kicks in, downloads some data and 
        // changes the state of a file. That causes "bytesWritten", "processedFiles" 
        // and "state" to be different from what we have just saved.

        // When we finally save the state of the files their parameters don't match 
        // the parameters of the download (state, bytesWritten, processedFiles).
        for (File *f : files)
        {
            // save the file...
        }
    }

private:
    std::string filePath;
    std::atomic<State> state = Parsing;
    std::atomic<int> bytesWritten = 0;
    int totalFiles = 0;
    std::atomic<int> processedFiles = 0;
    std::mutex fileMutex;
    std::vector<File*> files;
};
  

Интересно, как сохранить эти данные последовательно. Например, состояние и количество обработанных файлов, возможно, уже сохранены, и мы собираемся сохранить список файлов. Между тем, какой-либо другой поток может изменить состояние файла и, следовательно, количество обработанных файлов или состояние загрузки, делая сохраненные данные противоречивыми.

Первая идея, которая приходит на ум, — добавить единый мьютекс для всех элементов данных и блокировать его при каждом обращении к любому из них. Но это было бы, вероятно, неэффективно, поскольку большинство потоков времени обращаются к разным элементам данных, и сохранение происходит только один раз в несколько минут.

Мне кажется, такая задача довольно распространена в многопоточном программировании, поэтому я надеюсь, что опытные люди могли бы предложить лучший способ.

Комментарии:

1. «Первая идея, которая приходит на ум, — добавить единый мьютекс для всех элементов данных и блокировать его при каждом обращении к любому из них». — Почему вы не можете использовать несколько мьютексов и блокировать доступ к отдельным участникам? И почему бы не разделить класс на несколько разных классов, чтобы каждый поток мог спокойно работать со своими собственными фрагментами данных, пока он не будет завершен, и частичные результаты не будут собраны в конечный результат?

2. Ну, как я описал выше, блокировка отдельных элементов не предотвращает непоследовательное сохранение всего набора данных. Например. сохраненное состояние загрузки и количество обработанных файлов могут не совпадать с сохраненным списком файлов. Ну, потоки могут использовать одни и те же элементы данных. Я просто имел в виду, что они могут использовать не все из них.

Ответ №1:

Я бы посоветовал вам использовать шаблон потребителя-производителя.

Загрузчик отправляет в синтаксический анализатор и уведомляет его о потреблении, анализатор отправляет в экстрактор и уведомляет его о потреблении, а экстрактор — ремонтнику. Затем у вас будет очередь для каждой задачи. Синхронизация может быть оптимизирована с использованием переменных условий, чтобы потребитель извлекал данные только тогда, когда он получает уведомление после создания чего-либо. В конечном итоге вы будете использовать гораздо меньше мьютексов и гораздо более понятный и эффективный дизайн.

Вот пример кода для очереди и как это сделать, если вам нужно одновременно загружать, анализировать, извлекать и сохранять:

 #include <thread>
#include <condition_variable>
#include <mutex>
#include <queue>
template<typename T>
class synchronized_queu
{
public:
    T consume_one()
    {
        std::unique_lock<std::mutex> lock(lock_);
        while (queue_.size() == 0)
            condition_.wait(lock); //release and obtain again
        T available_data = queue_.front();
        queue_.pop();
        return available_data;
    }
    void produce_one(const Tamp; data)
    {
        std::unique_lock<std::mutex> lock(lock_);
        queue_.push(data);
        condition_.notify_one();//notify only one or all as per your design...
    }
private:
    std::queue<T> queue_;
    std::mutex lock_;
    std::condition_variable condition_;
};
struct data
{
    //.....
};

void download(synchronized_queu<data>amp; q)
{
    //...
    data data_downloaded = ; //data downloaded;
    q.produce_one(data_downloaded);
}

void parse(synchronized_queu<data>amp; q1, synchronized_queu<data>amp; q2)
{
    //...
    data data_downloaded = q1.consume_one();
    //parse
    data data_parsed = ;//....
    q2.produce_one(data_parsed);
}

void extract(synchronized_queu<data>amp; q1, synchronized_queu<data>amp; q2)
{
    //...
    data data_parsed = q1.consume_one();
    //parse
    data data_extracted = ;//....
    q2.produce_one(data_extracted);
}
void save(synchronized_queu<data>amp; q)
{
    data data_extracted = q.consume_one();
    //save....
}

int main()
{
    synchronized_queu<data> dowlowded_queue;
    synchronized_queu<data> parsed_queue;
    synchronized_queu<data> extracted_queue;

    std::thread downloader(download, dowlowded_queue);
    std::thread parser(parse, dowlowded_queue, parsed_queue);
    std::thread extractor(extract, parsed_queue, extracted_queue);
    std::thread saver(save, extracted_queue);
    while (/*condition to stop program*/)
    {

    }
    downloader.join();
    parser.join();
    extractor.join();
    saver.join();
    return 0;
}
  

Комментарии:

1. Дело в том, что эти задачи предназначены для одновременного выполнения, поэтому очереди нет. Сохранение требуется периодически независимо от того, что делают другие потоки.

2. Да, и это то, что я хотел предложить. Для простого объяснения давайте использовать только downloader и extractor; вы запускаете одновременно два потока downloader и extractor, разделяющие одну и ту же очередь Q1. Загрузчик, как только он загружает все, что он помещает в очередь, и поскольку экстрактор извлекает из очереди, как только что-то есть, он обрабатывает это….

3. Мой вопрос был скорее о сохранении данных во время их обработки другими потоками.

4. Заставкой может быть задача, работающая с очередью, содержащей окончательные данные для сохранения.

5. Состояние загрузки должно сохраняться, скажем, каждые 3 минуты, чтобы программа могла загрузить его и продолжить загрузку в случае сбоя или аварийного завершения. Следовательно, даже промежуточные результаты должны быть сохранены.