#java #spring-webflux #webflux
Вопрос:
Я пытаюсь реализовать приложение springboot, которое позволяет мне :
- загрузить mp3 на S3
- извлекайте метаданные и сохраняйте их в mongo DB.
Если я хочу извлечь метаданные из mp3, я должен предоставить MultipartFile. К сожалению, мы не можем отправить поток.
В моем контроллере я передаю 3 параметра:
@PostMapping
public Mono<ResponseEntity<AudioDto>> saveAudioTrack(@RequestHeader HttpHeaders headers,
@RequestHeader String fileName,
@RequestBody Flux<ByteBuffer> body) {
return this.audioService.saveAudioTrack(headers, fileName, body);
}
Теперь я немного застрял на своем уровне обслуживания. Я хочу создать свой ответ, сохранить данные и опубликовать файл на S3. Для хранения метаданных, mp3agic.
public Mono<ResponseEntity<AudioDto>> saveAudioTrack(HttpHeaders headers, String fileName, Flux<ByteBuffer> body) {
return Mono.from(body.flatMap(byteBuffer -> {
Mono<MultipartFile> multipartFileMono = AppUtils.byteBufferToMultipartFile(headers, fileName, body);
return multipartFileMono;
}).flatMap(multipartFile -> {
AudioEntity storedEntity = new AudioEntity();
try {
AudioDto audioDto = storeAudioMeta(multipartFile, headers);
AudioEntity audioEntity = new AudioEntity();
BeanUtils.copyProperties(audioDto, audioEntity);
storedEntity = audioRepository.save(audioEntity).block();
} catch (IOException | InvalidDataException | UnsupportedTagException e) {
e.printStackTrace();
}
AudioDto storedAudio = new AudioDto();
BeanUtils.copyProperties(storedEntity, storedAudio);
ResponseEntity<AudioDto> responseEntity = ResponseEntity.status(HttpStatus.OK).body(storedAudio);
return Mono.just(responseEntity);
}));
}
Сначала я пытаюсь преобразовать byteBufffer в MultipartFile:
public static Mono<MultipartFile> byteBufferToMultipartFile(HttpHeaders headers, String fileName, Flux<ByteBuffer> body) {
System.out.println(headers);
return Mono.from(body.flatMap(byteBuffer -> {
byte[] bytes = byteBuffer.array();
MultipartFile multipartFile = new MockMultipartFile(fileName, bytes);
return Mono.just(multipartFile);
}));
}
Затем я вызываю часть кода, которая должна позволить мне сохранять метаданные
private AudioDto storeAudioMeta(MultipartFile multipartFile, HttpHeaders headers) throws IOException, InvalidDataException, UnsupportedTagException {
AudioDto audioDto = new AudioDto();
if (multipartFile.isEmpty()) {
throw new IllegalStateException("Cannot upload empty file");
}
//Check if the file is an image => we'll check if it's a mp3
if(!multipartFile.getContentType().equals("audio/mpeg")) {
throw new IllegalStateException("File uploaded is not a mp3");
}
InputStream initialStream = multipartFile.getInputStream();
byte[] buffer = new byte[initialStream.available()];
initialStream.read(buffer);
File targetFile = new File("src/main/resources/" multipartFile.getOriginalFilename());
try (OutputStream outStream = new FileOutputStream(targetFile)) {
outStream.write(buffer);
}
Mp3File mp3file = new Mp3File(targetFile.getPath());
audioDto.setLength((int) mp3file.getLengthInSeconds());
audioDto.setBitrate(mp3file.getBitrate());
audioDto.setSampleRate(mp3file.getSampleRate());
if (mp3file.hasId3v1Tag()) {
ID3v1 id3v1Tag = mp3file.getId3v1Tag();
audioDto.setTrack(id3v1Tag.getTrack());
audioDto.setArtist(id3v1Tag.getArtist());
audioDto.setTitle(id3v1Tag.getTitle());
...
}
if (mp3file.hasId3v2Tag()) {
ID3v2 id3v2Tag = mp3file.getId3v2Tag();
audioDto.setTrack(id3v2Tag.getTrack());
...
audioDto.setAlbumImage(id3v2Tag.getAlbumImage());
byte[] albumImageData = id3v2Tag.getAlbumImage();
if (albumImageData != null) {
audioDto.setAlbumImageSize(albumImageData.length);
audioDto.setAlbumImageMimeType(id3v2Tag.getAlbumImageMimeType());
}
}
//get file metadata
Map<String, String> metadata = new HashMap<>();
metadata.put("Content-Type", audioDto.getFile().getContentType());
metadata.put("Content-Length", String.valueOf(audioDto.getFile().getSize()));
String bucketName = environment.getProperty("amazon.aws.s3.audioBucket");
String path = String.format("%s/%s", bucketName, UUID.randomUUID());
String fileName = String.format("%s", audioDto.getFile().getOriginalFilename());
// try {
// fileStore.upload(path, fileName, Optional.of(metadata),
audioDto.getFile().getInputStream());
// } catch (IOException e) {
throw new IllegalStateException("Failed to upload file", e);
//}
audioDto.setFileName(fileName);
audioDto.setFilePath(path);
return audioDto;
}
У меня возвращено сообщение об ошибке:
java.lang.IllegalStateException: Only one connection receive subscriber allowed.
at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:182) ~[reactor-netty-core-1.0.12.jar:1.0.12]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ Handler com.myapp.api.mediastreaming.ui.controller.AudioController#saveAudioTrack(HttpHeaders, String, Flux) [DispatcherHandler]
*__checkpoint ⇢ HTTP POST "/audio/tracks" [ExceptionHandlingWebHandler]
Stack trace:
at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:182) ~[reactor-netty-core-1.0.12.jar:1.0.12]
at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:143) ~[reactor-netty-core-1.0.12.jar:1.0.12]
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.11.jar:3.4.11]
at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:339) ~[reactor-netty-core-1.0.12.jar:1.0.12]
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.11.jar:3.4.11]
at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:339) ~[reactor-netty-core-1.0.12.jar:1.0.12]
at reactor.core.publisher.Mono.subscribe(Mono.java:4399) ~[reactor-core-3.4.11.jar:3.4.11]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:426) ~[reactor-core-3.4.11.jar:3.4.11]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.4.11.jar:3.4.11]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.11.jar:3.4.11]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.11.jar:3.4.11]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.11.jar:3.4.11]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200) ~[reactor-core-3.4.11.jar:3.4.11]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.11.jar:3.4.11]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279) ~[reactor-netty-core-1.0.12.jar:1.0.12]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388) ~[reactor-netty-core-1.0.12.jar:1.0.12]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404) ~[reactor-netty-core-1.0.12.jar:1.0.12]
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:584) ~[reactor-netty-http-1.0.12.jar:1.0.12]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-core-1.0.12.jar:1.0.12]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:261) ~[reactor-netty-http-1.0.12.jar:1.0.12]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.69.Final.jar:4.1.69.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.69.Final.jar:4.1.69.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.69.Final.jar:4.1.69.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.69.Final.jar:4.1.69.Final]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Комментарии:
1. есть несколько странных вещей, которые я вижу, когда вижу ваш код, во-первых, зачем использовать
Mono.from
, а неflatMap
body
напрямую. Во-вторых, большинство операций ввода-вывода блокируются, это вы можете понять, поскольку вам нужно сделать atry/catch
в середине. Блокировка вызовов должна быть установлена на собственного абонента. Вы можете прочитать о том, как обрабатывать блокирующие вызовы, в справочной документации по реактору. Но сначала я бы удалил все ненужныеMono.from
звонки2. что ж, я перепробовал «все возможные комбинации», чтобы получить желаемый результат. Это не сработало. мои первые дни в реактивном мире даются мне довольно тяжело. Я последовал вашему совету. И я нашел решение.
3. ваше решение по-прежнему не является должным образом реактивным, все файловые операции блокируются. И try catch обычно не используется в реактивном программировании. Вы смешиваете императивное программирование с реактивным программированием. Если вы не знаете, о чем я говорю, я предлагаю вам прочитать раздел
reactor documentation
и особенно раздел «Начало работы». Переполнение стека — это не то место, где нужно спрашивать об основах.4. Я собираюсь вернуться к основам. Спасибо
Ответ №1:
Наконец-то я нашел решение, очень вдохновленное этим уроком: https://www.vinsguru.com/spring-webflux-file-upload /
В моем контроллере :
@PostMapping(consumes = MediaType.MULTIPART_FORM_DATA_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<Mono<AudioDto>> saveAudioTrack(@RequestPart("file") Mono<FilePart> filePartMono) {
return new ResponseEntity<>(this.audioService.saveAudioTrack(filePartMono), HttpStatus.CREATED);
}
В моем сервисном уровне :
@Override
public Mono<AudioDto> saveAudioTrack(Mono<FilePart> filePartMono) {
Path basePath = Paths.get("./src/main/resources/upload/");
return filePartMono
.doOnNext(fp -> System.out.println("Received File : " fp.filename()))
.flatMap(fp -> fp.transferTo(basePath.resolve(fp.filename())))
.then(
filePartMono
.flatMap(fp -> Mono.just(new File(String.valueOf(basePath.resolve(fp.filename())))))
.map(file -> {
try {
return getAudioMetadata(file);
} catch (InvalidDataException | UnsupportedTagException | IOException e) {
e.printStackTrace();
throw new RuntimeException(e.getLocalizedMessage());
}
})
);
}
Мне также удается получить метаданные и загрузить файл в корзину S3:
Я вызываю свой метод для создания метаданных
private AudioDto getAudioMetadata(File file) throws InvalidDataException, UnsupportedTagException, IOException {
Mp3File mp3file = new Mp3File(file.getPath());
AudioDto audioDto = new AudioDto();
...
if (mp3file.hasId3v1Tag()) {
ID3v1 id3v1Tag = mp3file.getId3v1Tag();
audioDto.setTrack(id3v1Tag.getTrack());
...
}
if (mp3file.hasId3v2Tag()) {
ID3v2 id3v2Tag = mp3file.getId3v2Tag();
audioDto.setTrack(id3v2Tag.getTrack());
...
audioDto.setAlbumImage(id3v2Tag.getAlbumImage());
byte[] albumImageData = id3v2Tag.getAlbumImage();
if (albumImageData != null) {
audioDto.setAlbumImageSize(albumImageData.length);
audioDto.setAlbumImageMimeType(id3v2Tag.getAlbumImageMimeType());
}
}
//get file metadata
Map<String, String> metadata = new HashMap<>();
Tika tika = new Tika();
metadata.put("Content-Type", tika.detect(file));
metadata.put("Content-Length", String.valueOf(file.length()));
String path = String.format("%s/%s", environment.getProperty("amazon.aws.s3.audioBucket"), UUID.randomUUID());
//String fileName = String.format("%s", audioDto.getFile().getOriginalFilename());
try {
InputStream inputSteam = new FileInputStream(file);
fileStore.upload(path, file.getName(), Optional.of(metadata), inputSteam);
} catch (IOException e) {
throw new IllegalStateException("Failed to upload file", e);
}
audioDto.setFileName(file.getName());
...
ModelMapper modelMapper = new ModelMapper();
modelMapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
AudioEntity audioFileEntity = modelMapper.map(audioDto, AudioEntity.class);
AudioEntity storedEntity = audioRepository.save(audioFileEntity);
return modelMapper.map(storedEntity, AudioDto.class);
}
вот мой метод загрузки:
private final AmazonS3 amazonS3;
public void upload(String path,
String fileName,
Optional<Map<String, String>> optionalMetaData,
InputStream inputStream) {
ObjectMetadata objectMetadata = new ObjectMetadata();
optionalMetaData.ifPresent(map -> {
if (!map.isEmpty()) {
map.forEach(objectMetadata::addUserMetadata);
}
});
try {
amazonS3.putObject(path, fileName, inputStream, objectMetadata);
} catch (AmazonServiceException e) {
throw new IllegalStateException("Failed to upload the file", e);
}
}
Не самый элегантный способ, но функциональный. перед тем, как сообщить, что он полностью работоспособен, нужно немного потренироваться, но ничего сложного. 🙂