общее хранилище переменного тока.StorageInputStream : Поток уже закрыт

#azure #apache-camel #azure-blob-storage #spring-camel

Вопрос:

Я использую camel-azure-хранилище больших двоичных объектов для перемещения файлов в хранилище больших двоичных объектов azure. Я пытаюсь выполнить следующую конфигурацию маршрута.

 from(azurebloburi)  .process(getHeaders())  .log(LoggingLevel.INFO, log, "After adding Headers =${in.headers})")  .doTry()  .to(myapiendpoint)  .process(moveTheFile(SUCCESS_CONTAINER))  .toD(mysuccesscontaineruri-azure)  .doCatch(Exception.class)  .log(LoggingLevel.ERROR, log,  "Exception - Moving into ERROR.")  .process(moveTheFile(ERROR_CONTAINER))  .convertBodyTo(byte[].class)  .toD(myerrorcontainer-azure)  .end()  

Я использую эти настройки заголовка перед обработкой файла .toD(ERROR_CONTAINER) .

 exchange.getIn().setHeader(BlobConstants.BLOB_NAME, "changingBlobName");  exchange.getIn().setHeader(BlobConstants.BLOB_OPERATION,BlobOperationsDefinition.uploadBlockBlob);  exchange.getIn().setHeader(BlobConstants.BLOB_CONTAINER_NAME, "myexistingsourcelistener");  

Я получаю следующую ошибку. Любые предложения будут полезны.

«c.a.хранилище.общее.StorageInputStream : Поток уже закрыт».

 org.apache.camel.InvalidPayloadException: No body available of type: byte[] but has value: com.azure.storage.blob.specialized.BlobInputStream@5e692e22 of type: com.azure.storage.blob.specialized.BlobInputStream on: Message. Caused by: Error during type conversion from type: com.azure.storage.blob.specialized.BlobInputStream to the required type: byte[] with value com.azure.storage.blob.specialized.BlobInputStream@5e692e22 due to java.lang.RuntimeException: Stream is already closed.. Exchange[E8E38A893F8509F-0000000000000000]. Caused by: [org.apache.camel.TypeConversionException - Error during type conversion from type: com.azure.storage.blob.specialized.BlobInputStream to the required type: byte[] with value com.azure.storage.blob.specialized.BlobInputStream@5e692e22 due to java.lang.RuntimeException: Stream is already closed.]  at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:125) ~[camel-support-3.11.3.jar:3.11.3]  at org.apache.camel.support.processor.ConvertBodyProcessor.process(ConvertBodyProcessor.java:106) ~[camel-support-3.11.3.jar:3.11.3]  at org.apache.camel.support.processor.ConvertBodyProcessor.process(ConvertBodyProcessor.java:144) ~[camel-support-3.11.3.jar:3.11.3]  at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399) ~[camel-base-engine-3.11.3.jar:3.11.3]  at org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:109) ~[camel-core-processor-3.11.3.jar:3.11.3]  at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179) ~[camel-base-engine-3.11.3.jar:3.11.3]  at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64) ~[camel-base-engine-3.11.3.jar:3.11.3]  at org.apache.camel.processor.Pipeline.process(Pipeline.java:184) ~[camel-core-processor-3.11.3.jar:3.11.3]  at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399) ~[camel-base-engine-3.11.3.jar:3.11.3]  at org.apache.camel.component.azure.storage.blob.BlobConsumer.processBatch(BlobConsumer.java:156) ~[camel-azure-storage-blob-3.11.3.jar:3.11.3]  at org.apache.camel.component.azure.storage.blob.BlobConsumer.poll(BlobConsumer.java:71) ~[camel-azure-storage-blob-3.11.3.jar:3.11.3]  at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:192) ~[camel-support-3.11.3.jar:3.11.3]  at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107) ~[camel-support-3.11.3.jar:3.11.3]  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]  at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]  at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]  

отредактированный

Если я удаляю .convertBodyTo(byte[].class) , то получаю ошибку ниже.

 java.lang.RuntimeException: Stream is already closed.  at com.azure.storage.common.StorageInputStream.checkStreamState(StorageInputStream.java:135) ~[azure-storage-common-12.11.1.jar:na]  at com.azure.storage.common.StorageInputStream.readInternal(StorageInputStream.java:311) ~[azure-storage-common-12.11.1.jar:na]  at com.azure.storage.common.StorageInputStream.read(StorageInputStream.java:289) ~[azure-storage-common-12.11.1.jar:na]  at com.azure.storage.common.StorageInputStream.read(StorageInputStream.java:234) ~[azure-storage-common-12.11.1.jar:na]  at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:2314) ~[commons-io-2.6.jar:2.6]  at org.apache.commons.io.IOUtils.copy(IOUtils.java:2270) ~[commons-io-2.6.jar:2.6]  at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:2291) ~[commons-io-2.6.jar:2.6]  at org.apache.commons.io.IOUtils.copy(IOUtils.java:2246) ~[commons-io-2.6.jar:2.6]  at org.apache.commons.io.IOUtils.toByteArray(IOUtils.java:765) ~[commons-io-2.6.jar:2.6]  at org.apache.camel.component.azure.storage.blob.BlobUtils.getInputStreamLength(BlobUtils.java:37) ~[camel-azure-storage-blob-3.11.3.jar:3.11.3]  at org.apache.camel.component.azure.storage.blob.BlobStreamAndLength.createBlobStreamAndLengthFromExchangeBody(BlobStreamAndLength.java:50) ~[camel-azure-storage-blob-3.11.3.jar:3.11.3]  at org.apache.camel.component.azure.storage.blob.operations.BlobOperations.uploadBlockBlob(BlobOperations.java:181) ~[camel-azure-storage-blob-3.11.3.jar:3.11.3]  at org.apache.camel.component.azure.storage.blob.BlobProducer.process(BlobProducer.java:86) ~[camel-azure-storage-blob-3.11.3.jar:3.11.3]  at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66) ~[camel-support-3.11.3.jar:3.11.3]  at org.apache.camel.processor.SendDynamicProcessor.lambda$process$0(SendDynamicProcessor.java:197) ~[camel-core-processor-3.11.3.jar:3.11.3]  at org.apache.camel.support.cache.DefaultProducerCache.doInAsyncProducer(DefaultProducerCache.java:318) ~[camel-support-3.11.3.jar:3.11.3]  at org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:182) ~[camel-core-processor-3.11.3.jar:3.11.3]  at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399) ~[camel-base-engine-3.11.3.jar:3.11.3]  at org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:109) ~[camel-core-processor-3.11.3.jar:3.11.3]  at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179) ~[camel-base-engine-3.11.3.jar:3.11.3]  at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64) ~[camel-base-engine-3.11.3.jar:3.11.3]  at org.apache.camel.processor.Pipeline.process(Pipeline.java:184) ~[camel-core-processor-3.11.3.jar:3.11.3]  at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399) ~[camel-base-engine-3.11.3.jar:3.11.3]  at org.apache.camel.component.azure.storage.blob.BlobConsumer.processBatch(BlobConsumer.java:156) ~[camel-azure-storage-blob-3.11.3.jar:3.11.3]  at org.apache.camel.component.azure.storage.blob.BlobConsumer.poll(BlobConsumer.java:71) ~[camel-azure-storage-blob-3.11.3.jar:3.11.3]  at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:192) ~[camel-support-3.11.3.jar:3.11.3]  at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107) ~[camel-support-3.11.3.jar:3.11.3]  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]  at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]  at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]  

Ответ №1:

Используйте потоковое кэширование перед обработчиком BlobInputStream-это классическая реализация потока ввода Java.

 .log(LoggingLevel.ERROR, log,  "Exception - Moving into ERROR.")  .streamCaching()  .to("direct:test")  from("direct:test")  .process(moveTheFile(ERROR_CONTAINER))  .convertBodyTo(byte[].class)  .toD(myerrorcontainer-azure)  .end()  

Комментарии:

1. Сейчас я получаю эту ошибку. Есть какие-нибудь предложения ? com.azure.хранилище.большие двоичные объекты.модели. Исключение BlobStorageException: Код состояния 400, «lt;?версия xml=»1.0″ кодировка=»utf-8″?gt;lt;?версия xml=»1.0″ кодировка=»utf-8″?gt;lt;Ошибкаgt;lt;Ошибкаgt;lt;Кодgt;Md5Mismatchlt;Кодgt;lt;/Кодgt;lt;/Кодgt;lt;Сообщениеgt;Значение MD5, указанное в запросе, не совпадает со значением MD5, вычисленным сервером. Запрос:7e0db86e-201e-0004-386f-d41a61000000 Time:2021-11-08T07:10:11.8676069 Zlt;/Messagegt;lt;/Messagegt;lt;UserSpecifiedMd5gt;C2Uu5crj0V6G9Rd5aVELRw==lt;UserSpecifiedMd5gt;lt;/UserSpecifiedMd5gt;lt;/UserSpecifiedMd5gt;lt;ServerCalculatedMd5gt;N6YlnMDB2uKZp4Zkid/wvQ==lt;ServerCalculatedMd5gt;lt;/ServerCalculatedMd5gt;lt;/ServerCalculatedMd5gt;lt;/Errorgt;»