Эффективная обработка элементов с использованием памяти при сериализации JSON из Flux в S3

#java #amazon-s3 #project-reactor

#java #amazon-s3 #проект-реактор

Вопрос:

Я записываю большой массив JSON в S3 из a Flux . Одновременный сбор и сериализация всего потока объектов имеет существенные последствия для памяти. Это привело меня к тому, что я переосмыслил это как загрузку из нескольких частей, при этом код примерно похож на следующий:

 results
    .map(this::serialize)
    .map(
         bytes ->
             uploadBytes(
                         bytes,
                         filename,
                         bucket,
                         index.getAndIncrement(),
                         uploadId))
 

Это означает, что results в любой момент времени в памяти должен быть сериализован только один элемент. Это якобы работает, но не дает допустимого JSON, поскольку объединенный файл не разделяется запятыми и не ограничен скобками.

Мы можем добавить дополнительную логику для проверки индекса загрузки, чтобы первый элемент добавлял a [ , а каждый другой элемент добавлял a , . Предоставление структуры:

[result1 ,result2 ,resultX

Что может быть определено следующим кодом:

 ByteArrayOutputStream output = new ByteArrayOutputStream();
if (index == 1) {
    output.write('[');
} else {
    output.write(',');
}
output.write(bytes);
 

Эта стратегия по-прежнему опускает заключительную скобку, поскольку мы не знаем, является ли текущий элемент конечным элементом. Минимальный размер частей файла S3 также составляет 5 МБ. В худшем случае, наконец, загрузка ] будет дополнена 5 МБ пробелов.

Существует ли идиоматический способ определить, является ли какой-либо данный элемент в a Flux последним, и что полный сигнал следует непосредственно?

Комментарии:

1. Вы можете захватить последний элемент с помощью last operator . Или вы можете использовать doOnComplete . Вы записываете элементы последовательно?

2. @RahulKushwaha Это действительно был другой способ, которым это могло быть выполнено. На более ранней итерации я заархивировал поток с самим собой, подписавшись на last в одном из них. Хотя это дало желаемый эффект, это было грязно, и поэтому я выбрал решение, описанное ниже.

Ответ №1:

Чтобы добиться этого, я просто написал что-то вроде:

 results
    .map(this::serialize)
    .concatWithValues((byte) ']')
 

А затем загрузил их в S3 в буферах по 5 МБ (а не на элемент). Таким образом, элементы могут быть сериализованы лениво по мере заполнения буфера размером 5 МБ.