#c #threadpool
#c #threadpool
Вопрос:
Я работаю с базой данных, а не с RocksDB. У меня есть find
функция, которая принимает запрос в параметре, выполняет итерации по всем документам в базе данных и возвращает документы, соответствующие запросу. Я хочу распараллелить эту функцию, чтобы работа распределялась по нескольким потокам.
Для достижения этой цели я попытался использовать ThreadPool: я переместил код цикла в лямбда-выражение и добавил задачу в пул потоков для каждого документа. После цикла каждый результат обрабатывается основным потоком.
Текущая версия (однопоточная):
void
EmbeDB::find(const bson_tamp; query,
DocumentPtrCallback callback,
int32_t limit,
const bson_t* projection)
{
int32_t count = 0;
bson_error_t error;
uint32_t num_query_keys = bson_count_keys(amp;query);
mongoc_matcher_t* matcher = num_query_keys != 0
? mongoc_matcher_new(amp;query, amp;error)
: nullptr;
if (num_query_keys != 0 amp;amp; matcher == nullptr)
{
callback(amp;error, nullptr);
return;
}
bson_t document;
rocksdb::Iterator* it = _db->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next())
{
const char* bson_data = (const char*)it->value().data();
int bson_length = it->value().size();
std::vector<char> decrypted_data;
if (encryptionEnabled())
{
decrypted_data.resize(bson_length);
bson_length = decrypt_data(bson_data, bson_length, decrypted_data.data(), _encryption_method, _encryption_key, _encryption_iv);
bson_data = decrypted_data.data();
}
bson_init_static(amp;document, (const uint8_t*)bson_data, bson_length);
if (num_query_keys == 0 || mongoc_matcher_match(matcher, amp;document))
{
count;
if (projection != nullptr)
{
bson_error_t error;
bson_t projected;
bson_init(amp;projected);
mongoc_matcher_projection_execute_noop(
amp;document,
projection,
amp;projected,
amp;error,
NULL
);
callback(nullptr, amp;projected);
}
else
{
callback(nullptr, amp;document);
}
if (limit >= 0 amp;amp; count >= limit)
{
break;
}
}
}
delete it;
if (matcher)
{
mongoc_matcher_destroy(matcher);
}
}
Новая версия (многопоточная):
void
EmbeDB::find(const bson_tamp; query,
DocumentPtrCallback callback,
int32_t limit,
const bson_t* projection)
{
int32_t count = 0;
bool limit_reached = limit == 0;
bson_error_t error;
uint32_t num_query_keys = bson_count_keys(amp;query);
mongoc_matcher_t* matcher = num_query_keys != 0
? mongoc_matcher_new(amp;query, amp;error)
: nullptr;
if (num_query_keys != 0 amp;amp; matcher == nullptr)
{
callback(amp;error, nullptr);
return;
}
auto process_document = [this, projection, num_query_keys, matcher](const char* bson_data, int bson_length) -> bson_t*
{
std::vector<char> decrypted_data;
if (encryptionEnabled())
{
decrypted_data.resize(bson_length);
bson_length = decrypt_data(bson_data, bson_length, decrypted_data.data(), _encryption_method, _encryption_key, _encryption_iv);
bson_data = decrypted_data.data();
}
bson_t* document = new bson_t();
bson_init_static(document, (const uint8_t*)bson_data, bson_length);
if (num_query_keys == 0 || mongoc_matcher_match(matcher, document))
{
if (projection != nullptr)
{
bson_error_t error;
bson_t* projected = new bson_t();
bson_init(projected);
mongoc_matcher_projection_execute_noop(
document,
projection,
projected,
amp;error,
NULL
);
delete document;
return projected;
}
else
{
return document;
}
}
else
{
delete document;
return nullptr;
}
};
const int WORKER_COUNT = std::max(1u, std::thread::hardware_concurrency());
ThreadPool pool(WORKER_COUNT);
std::vector<std::future<bson_t*>> futures;
bson_t document;
rocksdb::Iterator* db_it = _db->NewIterator(rocksdb::ReadOptions());
for (db_it->SeekToFirst(); db_it->Valid(); db_it->Next())
{
const char* bson_data = (const char*)db_it->value().data();
int bson_length = db_it->value().size();
futures.push_back(pool.enqueue(process_document, bson_data, bson_length));
}
delete db_it;
for (auto it = futures.begin(); it != futures.end(); it)
{
bson_t* result = it->get();
if (result)
{
count = 1;
if (limit < 0 || count < limit)
{
callback(nullptr, result);
}
delete resu<
}
}
if (matcher)
{
mongoc_matcher_destroy(matcher);
}
}
- С помощью простых документов и запросов однопоточная версия обрабатывает 1 миллион документов за 0,5 секунды на моем компьютере.
- С теми же документами и запросом многопоточная версия обрабатывает 1 миллион документов за 3,3 секунды.
Удивительно, но многопоточная версия намного медленнее. Более того, я измерил время выполнения, и 75% времени тратится на цикл for. Таким образом, в основном строка futures.push_back(pool.enqueue(process_document, bson_data, bson_length));
занимает 75% времени.
Я сделал следующее:
- Я проверил значение
WORKER_COUNT
, оно равно 6 на моей машине. - Я попытался добавить
futures.reserve(1000000)
, думая, что, возможно, виновато перераспределение вектора, но это ничего не изменило. - Я попытался удалить динамические выделения памяти (
bson_t* document = new bson_t();
), это существенно не изменило результат.
Итак, мой вопрос: есть ли что-то, что я сделал неправильно, чтобы многопоточная версия была настолько медленнее, чем однопоточная версия?
Мое текущее понимание заключается в том, что операции синхронизации пула потоков (когда задачи ставятся в очередь и из очереди) просто занимают большую часть времени, и решением было бы изменить структуру данных. Мысли?
Комментарии:
1. Для начала вы привязываете все по значению к
process_document
функции. Таким образом, все копируется при каждом запуске.2. @freakish True но это либо указатели, либо целые числа, поэтому я не думаю, что это имеет значение.
Ответ №1:
Распараллеливание сопряжено с накладными расходами.
Для обработки каждого документа в однопоточной версии требуется около 500 наносекунд. Для делегирования работы пулу потоков требуется много бухгалтерии (как для делегирования работы, так и для последующей синхронизации), и вся эта бухгалтерия вполне может потребовать более 500 наносекунд на задание.
Предполагая, что ваш код правильный, тогда бухгалтерия занимает около 2800 наносекунд на задание. Чтобы получить значительное ускорение от распараллеливания, вы захотите разбить работу на более крупные куски.
Я рекомендую пытаться обрабатывать документы пакетами по 1000 за раз. Каждое будущее, вместо того, чтобы соответствовать только 1 документу, будет соответствовать 1000 документам.
Другие оптимизации
По возможности избегайте ненужного копирования. Если что-то скопировано кучей, посмотрите, можете ли вы записать это по ссылке, а не по значению.