Как я могу увеличить findOneAndUpdate в mongoose / mongodb до 5 миллионов обновлений?

#python #node.js #mongodb #mongoose

#python #node.js #mongodb #mongoose

Вопрос:

Я работаю над серверной частью с этими: nodejs, mongoose, mongodb, ironmq. И есть другое приложение (FTP-сервер python), которое используется в качестве источника данных.

Система, более или менее, работает следующим образом:

  • Пользователь загружает дамп данных csv (почти 3 миллиона записей) на FTP-сервер (это происходит периодически, раз в 24 часа)

  • FTP-сервер анализирует данные и синхронно отправляет их в очередь IronMQ пакетами (по 2000). Я выполняю пакетную обработку здесь для оптимизации памяти

  • Другое приложение (nodejs) продолжает опрашивать эту очередь на предмет данных, 100 сообщений (что является максимально допустимым числом) каждые 10 секунд, работает с этими данными, а затем обновляет мою базу данных (используя findOneAndUpdate для каждого сообщения). У меня запущено 5 из этих приложений.

Теперь с этой настройкой нет никаких явных проблем, за исключением времени, необходимого для завершения всей операции. Требуется почти 2 часа, чтобы полностью перенести проанализированные данные в MQ, но это не большая проблема, поскольку это выполняется пакетно. Настоящая проблема связана с частью «сохранение / обновление в БД».

В среднем 20-24 тыс. записей обновляются в БД каждый час. Но поскольку у меня 3 миллиона записей, это занимает более 24 часов (что не работает, поскольку файлы на FTP обновляются каждые 24 часа, и данные будут использоваться для выполнения определенных операций в других частях моего приложения).

Я не совсем уверен, как действовать дальше, но у меня есть пара вопросов.

  • Можно ли считать мой вышеупомянутый подход оптимальным / эффективным? Или что можно улучшить?
  • Как я могу сократить время, затрачиваемое на всю операцию обновления, либо через db, либо путем изменения дизайна?
  • Считается ли mongodb подходящим для этого случая или есть какие-либо лучшие альтернативы?

Было бы здорово, если бы вы могли оказать некоторую помощь в этом. Пожалуйста, дайте мне знать, если вам, ребята, понадобится дополнительная информация.

Ответ №1:

Вы можете оптимизировать свои обновления, используя методы Bulk API, которые очень эффективны, поскольку они позволяют отправлять множество операций обновления в рамках одного запроса (в пакетном режиме) на сервер. Рассмотрим следующие примеры, которые демонстрируют этот подход для разных версий MongoDB:

Предположим, что ваши приложения nodejs отправляют данные сообщений в список, а для версий Mongoose, >=4.3.0 которые поддерживают сервер MongoDB 3.2.x , вы можете использовать bulkWrite() для обновления коллекции как:

 var bulkUpdateCallback = function(err, r){
        console.log(r.matchedCount);
        console.log(r.modifiedCount);
    },
    operations = []; // Initialise the bulk operations array

messages.forEach(function (msg) { 
    operations.push({
        "updateOne": {
            "filter": { "_id": msg._id } ,              
            "update": { "$set": { "value": msg.value } } // example update operation
        }
    });

    // Send once in 500 requests only
    if (operations.length % 500 === 0 ) {
        Model.collection.bulkWrite(
            operations, 
            { "ordered": true, w: 1 }, 
            bulkUpdateCallback
        ); 
        operations = [];
    }    
});

// Get the underlying collection via the native node.js driver collection object
Model.collection.bulkWrite(bulkOps, { "ordered": true, w: 1 }, bulkUpdateCallback); 
  

В приведенном выше примере вы инициализируете свой массив операций обновления и ограничиваете операции пакетами по 500. Причина выбора меньшего значения, чем установленный по умолчанию лимит пакета в 1000, обычно является контролируемым выбором. Как указано в документации, MongoDB по умолчанию будет отправлять на сервер пакеты по 1000 операций за раз с максимумом, и нет гарантии, что эти запросы на 1000 операций по умолчанию действительно соответствуют ограничению 16 МБ BSON. Таким образом, вам все равно нужно быть на «безопасной» стороне и устанавливать меньший размер пакета, которым вы можете эффективно управлять только так, чтобы его общий размер был меньше предельного размера данных при отправке на сервер.


Если вы используете более старые версии Mongoose, ~3.8.8, ~3.8.22, 4.x которые поддерживают сервер MongoDB >=2.6.x , вы можете использовать Bulk() API следующим образом

 var bulk = Model.collection.initializeOrderedBulkOp(),
    bulkUpdateCallback = function(err, r){
        console.log(r.matchedCount);
        console.log(r.modifiedCount);
    },
    counter = 0;

messages.forEach(function(msg) {
    bulk.find({ "_id": msg._id }).updateOne({ 
        "$set": { "value": msg.value }
    });

    counter  ;
    if (counter % 500 == 0) {
        bulk.execute(function(err, r) {
           // do something with the result
           bulk = Model.collection.initializeOrderedBulkOp();
           counter = 0;
        });
    }
});

// Catch any docs in the queue under or over the 500's
if (counter > 0) {
    bulk.execute(bulkUpdateCallback);
}
  

Комментарии:

1. Спасибо, я постараюсь это реализовать.