Как преобразовать поток буфера данных в монобайтовый массив в Project Reactor?

#java #spring-webflux #project-reactor

Вопрос:

Есть Flux<DataBuffer> . В чем заключается естественный способ его преобразования Mono<byte[]> ?

 Mono<byte[]> mergeDataBuffers(Flux<DataBuffer> flux){
  // ?
}
 

Ответ №1:

Используйте org.springframework.core.io.buffer.DataBufferUtils для объединения буферов данных из Flux<DataBuffer> в один DataBuffer , а затем считайте этот буфер в массив байтов.

 
    Mono<byte[]> mergeDataBuffers(Flux<DataBuffer> dataBufferFlux) {
        return DataBufferUtils.join(dataBufferFlux)
                .map(dataBuffer -> {
                    byte[] bytes = new byte[dataBuffer.readableByteCount()];
                    dataBuffer.read(bytes);
                    DataBufferUtils.release(dataBuffer);
                    return bytes;
                });
    }
 

При этом обязательно учитывайте использование памяти, так как при таком подходе все данные будут загружены в память дважды (один раз в буферы данных, а затем снова при копировании в byte[] ).

Комментарии:

1. Спасибо тебе, Фил. О чем ты думаешь DataBufferUtils.join(dataBufferFlux).map(dbuf -> dbuf.asByteBuffer().array()) ? И не могли бы вы подробнее DataBufferUtils.release рассказать : я пробовал с ним и без него и не обнаружил видимого влияния на память (использовал VisualVM).

2. dataBuffer.asByteBuffer().array() не гарантируется, что будет работать для всех реализаций, так как не все реализации имеют резервный массив. DataBufferUtils.release необходимо вызвать , когда ваш код будет завершен с использованием DataBuffer , и на него больше не будет ссылок. Требуется правильно обрабатывать подсчет ссылок для PooledDataBuffer s. См. также docs.spring.io/spring-framework/docs/current/reference/html/…