#c #multithreading #concurrency #data-integrity #data-consistency
#c #многопоточность #параллелизм #целостность данных #согласованность данных
Вопрос:
Существует приложение-загрузчик, которое выполняет различные виды обработки элементов загрузки в нескольких потоках. Некоторые потоки анализируют входные данные, некоторые выполняют загрузку, извлечение, сохранение состояния и т. Д. Таким образом, каждый тип потока работает с определенными элементами данных, и некоторые из этих потоков могут выполняться одновременно. Элемент загрузки может быть описан следующим образом:
class File;
class Download
{
public:
enum State
{
Parsing, Downloading, Extracting, Repairing, Finished
};
Download(const std::string amp;filePath): filePath(filePath) { }
void save()
{
// TODO: save data consistently
StateFile f; // state file for this download
// save general download parameters
f << filePath << state << bytesWritten << totalFiles << processedFiles;
// Now we are to save the parameters of the files which belong to this download,
// (!) but assume the downloading thread kicks in, downloads some data and
// changes the state of a file. That causes "bytesWritten", "processedFiles"
// and "state" to be different from what we have just saved.
// When we finally save the state of the files their parameters don't match
// the parameters of the download (state, bytesWritten, processedFiles).
for (File *f : files)
{
// save the file...
}
}
private:
std::string filePath;
std::atomic<State> state = Parsing;
std::atomic<int> bytesWritten = 0;
int totalFiles = 0;
std::atomic<int> processedFiles = 0;
std::mutex fileMutex;
std::vector<File*> files;
};
Интересно, как сохранить эти данные последовательно. Например, состояние и количество обработанных файлов, возможно, уже сохранены, и мы собираемся сохранить список файлов. Между тем, какой-либо другой поток может изменить состояние файла и, следовательно, количество обработанных файлов или состояние загрузки, делая сохраненные данные противоречивыми.
Первая идея, которая приходит на ум, — добавить единый мьютекс для всех элементов данных и блокировать его при каждом обращении к любому из них. Но это было бы, вероятно, неэффективно, поскольку большинство потоков времени обращаются к разным элементам данных, и сохранение происходит только один раз в несколько минут.
Мне кажется, такая задача довольно распространена в многопоточном программировании, поэтому я надеюсь, что опытные люди могли бы предложить лучший способ.
Комментарии:
1. «Первая идея, которая приходит на ум, — добавить единый мьютекс для всех элементов данных и блокировать его при каждом обращении к любому из них». — Почему вы не можете использовать несколько мьютексов и блокировать доступ к отдельным участникам? И почему бы не разделить класс на несколько разных классов, чтобы каждый поток мог спокойно работать со своими собственными фрагментами данных, пока он не будет завершен, и частичные результаты не будут собраны в конечный результат?
2. Ну, как я описал выше, блокировка отдельных элементов не предотвращает непоследовательное сохранение всего набора данных. Например. сохраненное состояние загрузки и количество обработанных файлов могут не совпадать с сохраненным списком файлов. Ну, потоки могут использовать одни и те же элементы данных. Я просто имел в виду, что они могут использовать не все из них.
Ответ №1:
Я бы посоветовал вам использовать шаблон потребителя-производителя.
Загрузчик отправляет в синтаксический анализатор и уведомляет его о потреблении, анализатор отправляет в экстрактор и уведомляет его о потреблении, а экстрактор — ремонтнику. Затем у вас будет очередь для каждой задачи. Синхронизация может быть оптимизирована с использованием переменных условий, чтобы потребитель извлекал данные только тогда, когда он получает уведомление после создания чего-либо. В конечном итоге вы будете использовать гораздо меньше мьютексов и гораздо более понятный и эффективный дизайн.
Вот пример кода для очереди и как это сделать, если вам нужно одновременно загружать, анализировать, извлекать и сохранять:
#include <thread>
#include <condition_variable>
#include <mutex>
#include <queue>
template<typename T>
class synchronized_queu
{
public:
T consume_one()
{
std::unique_lock<std::mutex> lock(lock_);
while (queue_.size() == 0)
condition_.wait(lock); //release and obtain again
T available_data = queue_.front();
queue_.pop();
return available_data;
}
void produce_one(const Tamp; data)
{
std::unique_lock<std::mutex> lock(lock_);
queue_.push(data);
condition_.notify_one();//notify only one or all as per your design...
}
private:
std::queue<T> queue_;
std::mutex lock_;
std::condition_variable condition_;
};
struct data
{
//.....
};
void download(synchronized_queu<data>amp; q)
{
//...
data data_downloaded = ; //data downloaded;
q.produce_one(data_downloaded);
}
void parse(synchronized_queu<data>amp; q1, synchronized_queu<data>amp; q2)
{
//...
data data_downloaded = q1.consume_one();
//parse
data data_parsed = ;//....
q2.produce_one(data_parsed);
}
void extract(synchronized_queu<data>amp; q1, synchronized_queu<data>amp; q2)
{
//...
data data_parsed = q1.consume_one();
//parse
data data_extracted = ;//....
q2.produce_one(data_extracted);
}
void save(synchronized_queu<data>amp; q)
{
data data_extracted = q.consume_one();
//save....
}
int main()
{
synchronized_queu<data> dowlowded_queue;
synchronized_queu<data> parsed_queue;
synchronized_queu<data> extracted_queue;
std::thread downloader(download, dowlowded_queue);
std::thread parser(parse, dowlowded_queue, parsed_queue);
std::thread extractor(extract, parsed_queue, extracted_queue);
std::thread saver(save, extracted_queue);
while (/*condition to stop program*/)
{
}
downloader.join();
parser.join();
extractor.join();
saver.join();
return 0;
}
Комментарии:
1. Дело в том, что эти задачи предназначены для одновременного выполнения, поэтому очереди нет. Сохранение требуется периодически независимо от того, что делают другие потоки.
2. Да, и это то, что я хотел предложить. Для простого объяснения давайте использовать только downloader и extractor; вы запускаете одновременно два потока downloader и extractor, разделяющие одну и ту же очередь Q1. Загрузчик, как только он загружает все, что он помещает в очередь, и поскольку экстрактор извлекает из очереди, как только что-то есть, он обрабатывает это….
3. Мой вопрос был скорее о сохранении данных во время их обработки другими потоками.
4. Заставкой может быть задача, работающая с очередью, содержащей окончательные данные для сохранения.
5. Состояние загрузки должно сохраняться, скажем, каждые 3 минуты, чтобы программа могла загрузить его и продолжить загрузку в случае сбоя или аварийного завершения. Следовательно, даже промежуточные результаты должны быть сохранены.