#spring-integration #spring-integration-dsl #spring-integration-sftp
#spring-интеграция #spring-интеграция-dsl #spring-интеграция-sftp
Вопрос:
У меня есть поток интеграции.
IntegrationFlows.from(
Sftp.inboundAdapter(inboundSftp)
.localDirectory(this.getlocalDirectory(config.getId()))
.deleteRemoteFiles(true)
.autoCreateLocalDirectory(true)
.filter(new CompositeFileListFilter().addFilter(new LastModifiedLsEntryFileListFilter(config.getRegexFilter())))
.remoteDirectory(config.getInboundDirectory())
, e -> e.poller(
Pollers.cron(config.getCron())
.errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {
try {
this.destroy(String.valueOf(config.getId()));
configurationService.removeConfigurationChannelById(config.getId());
//loggin here
}
} catch (Exception ex1) {
Logger.getLogger(ExceptionAspect.class.getName()).log(Level.SEVERE, null, ex1);
}
})
.advice(startup.scanRemoteDirectory())
))
.transform(
file -> util.transform((File) file, config.getSourceEncoding(), config.getTargetEncoding(), doEncoding, doZip))
.publishSubscribeChannel(s -> s
.subscribe(f -> {
f.handle(Sftp.outboundAdapter(outboundSftp)
.useTemporaryFileName(false)
.autoCreateDirectory(true)
.remoteDirectory(config.getOutboundDirectory()), c -> c.advice(startup.deleteFileAdvice()));
})
.subscribe(f -> {
if(doArchive) {
f.handle(Sftp.outboundAdapter(inboundSftp)
.useTemporaryFileName(false)
.autoCreateDirectory(true)
.remoteDirectory(config.getInboundArchiveDirectory()));
} else {
f.handle(m -> {});
}
})
.subscribe(f -> f
.handle(m -> {
// file transfer logging here
if(doArchive) {
// file archived logging here
}
}
})
)
)
.get();
на 1-м подписчике, если файл не удалось загрузить, он все еще печатает журналы, что файл передан, но на самом деле это не так.
насколько я понимаю, сообщение будет передано каждому подписчику, и когда 1-й подписчик завершит свою работу, оно будет отправлено следующему.
В моем случае первому подписчику фактически не удается загрузить файл. Совет не удалять файл.
Я попробовал несколько вещей.
IntegrationFlows.from(
Sftp.inboundAdapter(inboundSftp)
..........
))
.transform(....
)
.publishSubscribeChannel(s -> s
.subscribe(f -> {
f.handle(Sftp.outboundAdapter(outboundSftp)
.useTemporaryFileName(false)
.autoCreateDirectory(true)
.remoteDirectory(config.getOutboundDirectory()), c -> c.advice(startup.deleteFileAdvice()));
}).publishSubcribeChannel(s -> s
.subscribe(f -> {
if(doArchive) {
f.handle(Sftp.outboundAdapter(inboundSftp)
.useTemporaryFileName(false)
.autoCreateDirectory(true)
.remoteDirectory(config.getInboundArchiveDirectory()));
} else {
f.handle(m -> {});
}
})
.subscribe(f -> f
.handle(m -> {
// file transfer logging here
if(doArchive) {
// file archived logging here
}
}
})
)
)
)
.get();
Я пытаюсь опубликовать снова после подписчика outboundAdapter, но по ошибке он все еще опубликован.
Также попробуйте добавить .errorChannel в подписчике outboundAdapter, у меня не сработало.
. ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _
( ( )___ | '_ | '_| | '_ / _` |
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |___, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.2.RELEASE)
2020-08-20 11:10:53,265 INFO c.t.i.s.I2SftpApplication - Starting I2SftpApplication on GSEUC5CG8393GR5 with PID 21636 (C:UsersMuhammadUmairIdeaProjectsi2sftpinboudservicetargetclasses started by MuhammadUmair in C:UsersMuhammadUmairIdeaProjectsi2sftpinboudservice)
2020-08-20 11:10:53,269 INFO c.t.i.s.I2SftpApplication - No active profile set, falling back to default profiles: default
2020-08-20 11:10:54,464 INFO o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2020-08-20 11:10:54,478 INFO o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2020-08-20 11:10:54,577 INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-08-20 11:10:54,584 INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-08-20 11:10:54,597 INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-08-20 11:10:54,906 INFO o.s.b.w.e.t.TomcatWebServer - Tomcat initialized with port(s): 8080 (http)
2020-08-20 11:10:54,917 INFO o.a.c.h.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8080"]
2020-08-20 11:10:54,917 INFO o.a.c.c.StandardService - Starting service [Tomcat]
2020-08-20 11:10:54,917 INFO o.a.c.c.StandardEngine - Starting Servlet engine: [Apache Tomcat/9.0.29]
2020-08-20 11:10:55,095 INFO o.a.c.c.C.[.[.[/] - Initializing Spring embedded WebApplicationContext
2020-08-20 11:10:55,095 INFO o.s.w.c.ContextLoader - Root WebApplicationContext: initialization completed in 1705 ms
2020-08-20 11:10:56,920 INFO o.s.s.c.ThreadPoolTaskExecutor - Initializing ExecutorService 'applicationTaskExecutor'
2020-08-20 11:10:57,243 INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2020-08-20 11:10:57,415 INFO o.s.i.e.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-08-20 11:10:57,416 INFO o.s.i.c.PublishSubscribeChannel - Channel 'application.errorChannel' has 1 subscriber(s).
2020-08-20 11:10:57,416 INFO o.s.i.e.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2020-08-20 11:10:57,416 INFO o.s.i.e.EventDrivenConsumer - Adding {message-handler:startup.resultFileHandler.serviceActivator} as a subscriber to the 'fromSftpChannel' channel
2020-08-20 11:10:57,416 INFO o.s.i.c.DirectChannel - Channel 'application.fromSftpChannel' has 1 subscriber(s).
2020-08-20 11:10:57,417 INFO o.s.i.e.EventDrivenConsumer - started bean 'startup.resultFileHandler.serviceActivator'
2020-08-20 11:10:57,422 INFO o.s.i.e.SourcePollingChannelAdapter - started bean 'startup.sftpMessageSource.inboundChannelAdapter'
2020-08-20 11:10:57,441 INFO o.a.c.h.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8080"]
2020-08-20 11:10:57,498 INFO o.s.b.w.e.t.TomcatWebServer - Tomcat started on port(s): 8080 (http) with context path ''
2020-08-20 11:10:57,508 INFO c.t.i.s.I2SftpApplication - Started I2SftpApplication in 9.228 seconds (JVM running for 13.133)
Registering an Integration Flow with id : 1
2020-08-20 11:10:57,689 INFO o.s.i.c.DirectChannel - Channel 'application.1.subFlow#2.channel#0' has 1 subscriber(s).
2020-08-20 11:10:57,690 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
2020-08-20 11:10:57,691 INFO o.s.i.e.EventDrivenConsumer - Adding {bridge} as a subscriber to the '1.subFlow#0.channel#0' channel
2020-08-20 11:10:57,691 INFO o.s.i.c.PublishSubscribeChannel - Channel 'application.1.subFlow#0.channel#0' has 1 subscriber(s).
2020-08-20 11:10:57,691 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-08-20 11:10:57,691 INFO o.s.i.c.DirectChannel - Channel 'application.1.subFlow#1.channel#0' has 1 subscriber(s).
2020-08-20 11:10:57,691 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#1.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
2020-08-20 11:10:57,691 INFO o.s.i.e.EventDrivenConsumer - Adding {bridge} as a subscriber to the '1.subFlow#0.channel#0' channel
2020-08-20 11:10:57,692 INFO o.s.i.c.PublishSubscribeChannel - Channel 'application.1.subFlow#0.channel#0' has 2 subscriber(s).
2020-08-20 11:10:57,692 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#1.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-08-20 11:10:57,693 INFO o.s.i.e.EventDrivenConsumer - Adding {message-handler} as a subscriber to the '1.subFlow#0.channel#1' channel
2020-08-20 11:10:57,694 INFO o.s.i.c.DirectChannel - Channel 'application.1.subFlow#0.channel#1' has 1 subscriber(s).
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#0.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - Adding {bridge} as a subscriber to the '1.subFlow#0.channel#0' channel
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#0.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - Adding {transformer} as a subscriber to the '1.channel#0' channel
2020-08-20 11:10:57,694 INFO o.s.i.c.DirectChannel - Channel 'application.1.channel#0' has 1 subscriber(s).
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - started bean '1.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-08-20 11:10:57,700 INFO o.s.i.e.SourcePollingChannelAdapter - started bean '1.org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0'
2020-08-20 11:10:58,017 INFO c.j.jsch - Connecting to *.*.*.114 port 22
2020-08-20 11:10:58,341 INFO c.j.jsch - Connection established
2020-08-20 11:10:58,672 INFO c.j.jsch - Remote version string: SSH-2.0-OpenSSH_7.4
2020-08-20 11:10:58,673 INFO c.j.jsch - Local version string: SSH-2.0-JSCH-0.1.54
2020-08-20 11:10:58,674 INFO c.j.jsch - CheckCiphers: aes256-ctr,aes192-ctr,aes128-ctr,aes256-cbc,aes192-cbc,aes128-cbc,3des-ctr,arcfour,arcfour128,arcfour256
2020-08-20 11:10:58,728 INFO c.j.jsch - CheckKexes: diffie-hellman-group14-sha1,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521
2020-08-20 11:10:58,815 INFO c.j.jsch - CheckSignatures: ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2020-08-20 11:10:58,823 INFO c.j.jsch - SSH_MSG_KEXINIT sent
2020-08-20 11:10:59,036 INFO c.j.jsch - SSH_MSG_KEXINIT received
2020-08-20 11:10:59,038 INFO c.j.jsch - kex: server: curve25519-sha256,curve25519-sha256@libssh.org,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group-exchange-sha256,diffie-hellman-group16-sha512,diffie-hellman-group18-sha512,diffie-hellman-group-exchange-sha1,diffie-hellman-group14-sha256,diffie-hellman-group14-sha1,diffie-hellman-group1-sha1
2020-08-20 11:10:59,038 INFO c.j.jsch - kex: server: ssh-rsa,rsa-sha2-512,rsa-sha2-256,ecdsa-sha2-nistp256,ssh-ed25519
2020-08-20 11:10:59,039 INFO c.j.jsch - kex: server: aes256-ctr,aes192-ctr,aes128-ctr
2020-08-20 11:10:59,040 INFO c.j.jsch - kex: server: aes256-ctr,aes192-ctr,aes128-ctr
2020-08-20 11:10:59,040 INFO c.j.jsch - kex: server: hmac-sha2-512,hmac-sha2-256
2020-08-20 11:10:59,041 INFO c.j.jsch - kex: server: hmac-sha2-512,hmac-sha2-256
2020-08-20 11:10:59,042 INFO c.j.jsch - kex: server: none,zlib@openssh.com
2020-08-20 11:10:59,042 INFO c.j.jsch - kex: server: none,zlib@openssh.com
2020-08-20 11:10:59,042 INFO c.j.jsch - kex: server:
2020-08-20 11:10:59,043 INFO c.j.jsch - kex: server:
2020-08-20 11:10:59,043 INFO c.j.jsch - kex: client: ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group14-sha1,diffie-hellman-group-exchange-sha256,diffie-hellman-group-exchange-sha1,diffie-hellman-group1-sha1
2020-08-20 11:10:59,044 INFO c.j.jsch - kex: client: ssh-rsa,ssh-dss,ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2020-08-20 11:10:59,044 INFO c.j.jsch - kex: client: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-ctr,aes192-cbc,aes256-ctr,aes256-cbc
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-ctr,aes192-cbc,aes256-ctr,aes256-cbc
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: none
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: none
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client:
2020-08-20 11:10:59,046 INFO c.j.jsch - kex: client:
2020-08-20 11:10:59,046 INFO c.j.jsch - kex: server->client aes128-ctr hmac-sha2-256 none
2020-08-20 11:10:59,047 INFO c.j.jsch - kex: client->server aes128-ctr hmac-sha2-256 none
2020-08-20 11:10:59,053 INFO c.j.jsch - SSH_MSG_KEX_ECDH_INIT sent
2020-08-20 11:10:59,053 INFO c.j.jsch - expecting SSH_MSG_KEX_ECDH_REPLY
2020-08-20 11:10:59,385 INFO c.j.jsch - ssh_rsa_verify: signature true
2020-08-20 11:10:59,389 INFO o.s.i.s.s.DefaultSftpSessionFactory - The authenticity of host '*.*.*.114' can't be established.
RSA key fingerprint is 1f:3e:c9:5f:37:00:a1:00:ef:50:59:af:42:98:99:e9.
Are you sure you want to continue connecting?
2020-08-20 11:10:59,390 WARN c.j.jsch - Permanently added '*.*.*.114' (RSA) to the list of known hosts.
2020-08-20 11:10:59,390 INFO c.j.jsch - SSH_MSG_NEWKEYS sent
2020-08-20 11:10:59,390 INFO c.j.jsch - SSH_MSG_NEWKEYS received
2020-08-20 11:10:59,395 INFO c.j.jsch - SSH_MSG_SERVICE_REQUEST sent
2020-08-20 11:10:59,718 INFO c.j.jsch - SSH_MSG_SERVICE_ACCEPT received
2020-08-20 11:11:00,047 INFO c.j.jsch - Authentications that can continue: gssapi-with-mic,publickey,keyboard-interactive,password
2020-08-20 11:11:00,047 INFO c.j.jsch - Next authentication method: gssapi-with-mic
2020-08-20 11:11:00,399 INFO c.j.jsch - Authentications that can continue: password
2020-08-20 11:11:00,399 INFO c.j.jsch - Next authentication method: password
2020-08-20 11:11:00,746 INFO c.j.jsch - Authentication succeeded (password).
Total Files: 1
2020-08-20 11:11:27,623 INFO c.t.i.s.s.LastModifiedLsEntryFileListFilter - [OB.xml] old size [null] increased to [0]...
Total Files: 1
2020-08-20 11:11:29,636 INFO c.t.i.s.s.LastModifiedLsEntryFileListFilter - [OB.xml] old size [0] increased to [38709]...
Total Files: 1
2020-08-20 11:11:34,256 INFO c.j.jsch - Connecting to *.*.*.115 port 22
2020-08-20 11:11:55,282 INFO c.t.i.c.f.a.LoggingAspect - log message Integration Name=CXML invoice DK Service Name=FileTransferService Source=External Source Interface=/home/umair/accruals/14minute/ Target=NA Target Interface=/home/umair/accruals/15minute/ Content ID=OB.xml Message ID=N/A Category=SUCCESS Timestamp=2020-08-20T11:11:55.281 Message=file OB.xml transferred
согласно предложению Артема Билана, я создал демонстрационное приложение и загрузил его в репозиторий git здесь
здесь представлены конфигурации сервера
поток интеграции определяется здесь
Комментарии:
1. Есть ли шансы увидеть какие-нибудь ваши логи по этому вопросу? Также неясно, о какой части вашей конфигурации вы говорите…
2. @ArtemBilan я столкнулся с проблемой с первым подписчиком (который содержит Sftp.outboundAdapter (outboundSftp)). когда я устанавливаю неправильную конфигурацию сервера для выходного канала, также вызывается подписчик после Sftp.outboundAdapter (outboundSftp)).
3. Хм. Технически это невозможно, поскольку по умолчанию
ignoreFailures
являетсяfalse
, поэтому из неудачно подписанного выдается исключение. Было бы здорово увидеть некоторые журналы с этой ошибкой. Также: какую версию интеграции Spring вы используете?4. @ArtemBilan весенняя загрузка 2.2.2.РЕЛИЗ. SFTP для интеграции Spring 5.3.2.РЕЛИЗ. Spring integration ftp 5.3.2.RELEASE
5. Если вы говорите, что во время загрузки произошла ошибка, пожалуйста, поделитесь трассировкой стека в качестве редактирования вашего вопроса. Журналы не читаются в комментариях.
Ответ №1:
Я нашел вашу проблему:
@Bean
public Advice deleteFileAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpressionString(AppConstants.SUCCESS_EXPRESSION);
advice.setTrapException(true);
return advice;
}
Посмотрите его JavaDocs:
/**
* If true, any exception will be caught and null returned.
* Default false.
* @param trapException true to trap Exceptions.
*/
public void setTrapException(boolean trapException) {
Таким образом, любые исключения из обработчика сообщений, включая сбой SFTP conneciton, будут проглочены. Следовательно, сообщение о запросе действительно перемещается следующему подписчику в вашей publishSubcribeChannel
конфигурации.
Комментарии:
1. Я добавил этот флаг, но он все еще перемещается на следующий канал.
2. Да? Он должен
false
повторно выдать исключение. Извините, я не уверен в вашей логике.3. Установлено значение true, как вы предлагаете.
4. есть ли какой-либо альтернативный способ, который я могу использовать?
5. Если это так
false
, возникает исключение. СмотритеExpressionEvaluatingRequestHandlerAdvice
исходный код. Не могли бы вы, пожалуйста, установить точку останова вExpressionEvaluatingRequestHandlerAdvice.doInvoke()
ot для расследования того, что происходит после того, как этоcallback.execute()
вызвано?