#java #spring-boot #spring-integration #spring-integration-dsl #spring-integration-sftp
#java #spring-boot #spring-интеграция #spring-интеграция-dsl #spring-интеграция-sftp
Вопрос:
Я использую входящий адаптер с использованием Java DSL для опроса PDF-файлов с SFTP-сервера. У меня есть вариант использования, когда после извлечения файла pdf приложение получит файл конфигурации, представленный в формате CSV с тем же именем на SFTP-сервере. После извлечения файла конфигурации приложение обработает исходный файл pdf, используя свойства, определенные в файле конфигурации, и загрузит его обратно на SFTP-сервер с использованием исходящего адаптера.
Я сталкиваюсь с проблемами при извлечении файлов конфигурации в обработчике в том же потоке с использованием исходящего шлюза.
Вот мой код :
Регистрируйте потоки интеграции:
for (String client : clientsArr) {
this.flowContext.registration(getInboundIntegrationFlow(client)).register();
}
this.flowContext.registration(getOutboundIntegrationFlow()).register();
this.flowContext.registration(sftpGatewayGetIntegrationFlow()).register();
Поток интеграции входящего адаптера:
@Autowired
private SftpPdfMessageHandler messageHandler;
private IntegrationFlow getInboundIntegrationFlow(String client) {
String remoteDirectory = getRemoteDirectory(client);
String localDirectory = getLocalDirectory(client);
String inboundAdapterId = getInboundAdapterId(client);
return IntegrationFlows
.from(Sftp.inboundAdapter(sftpSessionFactory())
.preserveTimestamp(true)
.remoteDirectory(remoteDirectory)
.autoCreateLocalDirectory(true)
.localDirectory(new File(localDirectory))
.maxFetchSize(Integer.parseInt(sftpProperties.getMaxFetchSize()))
.filter(new SftpSimplePatternFileListFilter(sftpProperties.getRemoteFileFilter()))
.deleteRemoteFiles(true),
e -> e.id(inboundAdapterId)
.autoStartup(true)
.poller(Pollers
.fixedDelay(Long.parseLong(sftpProperties.getPollPeriodInSeconds()), TimeUnit.SECONDS)
.receiveTimeout(Long.parseLong(sftpProperties.getPollerTimeout()))
.maxMessagesPerPoll(Long.parseLong(sftpProperties.getMaxMessagesPerPoll()))
))
.handle(inBoundHandler())
.get();
}
public MessageHandler inBoundHandler() {
return message -> {
File file = (File) message.getPayload();
messageHandler.handleMessage(file);
};
}
Поток интеграции исходящего адаптера:
private IntegrationFlow getOutboundIntegrationFlow() {
return IntegrationFlows.from("sftpOutboundChannel")
.handle(Sftp.outboundAdapter(sftpSessionFactory(), FileExistsMode.FAIL)
.remoteDirectoryExpression(String.format("headers['%s']", FileHeaders.REMOTE_DIRECTORY))).get();
}
@Bean("sftpOutboundChannel")
public MessageChannel sftpOutboundChannel() {
return new DirectChannel();
}
Обработчик сообщений SFTP:
@Async("sftpHandlerAsyncExecutor")
public void handleMessage(File originalFile) {
File configFile = fetchConfigFile();
/*
process original file and store processed file in output file path on local directory
*/
boolean success = uploadFileToSftpServer(outputFilePath, client, entity);
if (success) {
deleteFileFromLocal(originalFile);
}
}
Исходящий шлюз ПОЛУЧАЕТ поток интеграции:
private IntegrationFlow sftpGatewayGetIntegrationFlow() {
return IntegrationFlows.from("sftpGetInputChannel")
.handle(Sftp.outboundGateway(sftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.GET, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.DELETE,
AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP)
.localDirectoryExpression(String.format("headers['%s']", Constants.HEADER_LOCAL_DIRECTORY_NAME))
.autoCreateLocalDirectory(true))
.channel("nullChannel")
.get();
}
@Bean("sftpGetInputChannel")
public MessageChannel sftpGetInputChannel() {
return new DirectChannel();
}
messageHandler.handleMessage()
метод вызывается в асинхронном режиме (с помощью ThreadPoolTaskExecutor), который внутренне извлекает файл конфигурации с использованием исходящего шлюза. Но я не смог найти ни одного канала, по которому я мог бы отправлять и получать полезную нагрузку сообщения в одном потоке. Я нашел MessagingTemplate в spring docs, но не смог найти способ подключить это к моему потоку интеграции исходящего шлюза.
sftpGetMessageTemplate.sendAndReceive(sftpGetInputChannel, new GenericMessage<>("/dir/file.csv", headers))
выдает исключение «У диспетчера нет подписчиков на канал» с помощью DirectChannel().
Я ищу решение, при котором я могу извлечь требуемый файл с сервера любым из следующих способов :
- Интегрируйте MessagingTemplate с IntegrationFlow (если возможно), используя соответствующий канал.
- Некоторая цепочка обработчиков сообщений во входящем потоке адаптера, где после опроса исходного файла он извлекает другой файл с помощью sftp outbound gateway, а затем вызывает конечный обработчик с обоими объектами (исходным файлом и файлом конфигурации). Я пытаюсь добиться аналогичной вещи, используя пользовательский код выше.
- Любой другой способ использования каналов отправки и опроса для команды GET в многопоточной среде.
Приложению необходимо определить путь к каталогу во время выполнения при использовании команды GET.
Ответ №1:
Вероятно, вам нужно узнать, что такое @MessagingGateway
и как заставить его взаимодействовать с каналами на вашем исходящем шлюзе.
Смотрите документы для получения дополнительной информации: https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/messaging-endpoints.html#gateway
Если вы действительно хотите получить в результате файл конфигурации, вам не следует этого делать .channel("nullChannel")
. Когда шлюз будет в руках, появится replyChannel
заголовок с TemporaryReplyChannel
экземпляром, заполненным шлюзом. Затем в вашем коде вы просто собираетесь использовать этот функциональный интерфейс в качестве API для вызова.
Фактически, этот шлюз обмена сообщениями использует упомянутый MessagingTemplate.sendAndReceive()
.
Комментарии:
1. Спасибо @Artem, шлюз обмена сообщениями полезен, и я могу отправлять и получать сообщения с его помощью. Он также работает с template.sendAndReceive после удаления «nullChannel».