Сообщение о публикации потока интеграции Spring DSL после ошибки в исходящем адаптере

#spring-integration #spring-integration-dsl #spring-integration-sftp

#spring-интеграция #spring-интеграция-dsl #spring-интеграция-sftp

Вопрос:

У меня есть поток интеграции.

 IntegrationFlows.from(
            Sftp.inboundAdapter(inboundSftp)
            .localDirectory(this.getlocalDirectory(config.getId()))
            .deleteRemoteFiles(true)
            .autoCreateLocalDirectory(true)
            .filter(new CompositeFileListFilter().addFilter(new LastModifiedLsEntryFileListFilter(config.getRegexFilter())))
            .remoteDirectory(config.getInboundDirectory())
            , e -> e.poller(
                    Pollers.cron(config.getCron())
                    .errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {
                        try {

                            this.destroy(String.valueOf(config.getId()));
                            configurationService.removeConfigurationChannelById(config.getId());
                            //loggin here

                            }
                        } catch (Exception ex1) {
                            Logger.getLogger(ExceptionAspect.class.getName()).log(Level.SEVERE, null, ex1);
                        }
                    })
                    .advice(startup.scanRemoteDirectory())
            ))
            .transform(
                file -> util.transform((File) file, config.getSourceEncoding(), config.getTargetEncoding(), doEncoding, doZip))
            .publishSubscribeChannel(s -> s
                .subscribe(f -> {
                    f.handle(Sftp.outboundAdapter(outboundSftp)
                        .useTemporaryFileName(false)
                        .autoCreateDirectory(true)
                        .remoteDirectory(config.getOutboundDirectory()), c -> c.advice(startup.deleteFileAdvice()));

                })
                .subscribe(f -> {

                    if(doArchive) {
                        f.handle(Sftp.outboundAdapter(inboundSftp)
                            .useTemporaryFileName(false)
                            .autoCreateDirectory(true)
                            .remoteDirectory(config.getInboundArchiveDirectory()));
                    } else {
                        f.handle(m -> {});
                    }

                })
                .subscribe(f -> f
                    .handle(m -> {

                        // file transfer logging here
                        if(doArchive) {
                          // file archived logging here
                         }
                        }
                    })
                )
            )
            .get();
  

на 1-м подписчике, если файл не удалось загрузить, он все еще печатает журналы, что файл передан, но на самом деле это не так.

насколько я понимаю, сообщение будет передано каждому подписчику, и когда 1-й подписчик завершит свою работу, оно будет отправлено следующему.

В моем случае первому подписчику фактически не удается загрузить файл. Совет не удалять файл.

Я попробовал несколько вещей.

 IntegrationFlows.from(
                Sftp.inboundAdapter(inboundSftp)
               ..........
                ))
                .transform(....
                   )
                .publishSubscribeChannel(s -> s
                    .subscribe(f -> {
                        f.handle(Sftp.outboundAdapter(outboundSftp)
                            .useTemporaryFileName(false)
                            .autoCreateDirectory(true)
                            .remoteDirectory(config.getOutboundDirectory()), c -> c.advice(startup.deleteFileAdvice()));

                    }).publishSubcribeChannel(s -> s
                      .subscribe(f -> {

                        if(doArchive) {
                            f.handle(Sftp.outboundAdapter(inboundSftp)
                                .useTemporaryFileName(false)
                                .autoCreateDirectory(true)
                                .remoteDirectory(config.getInboundArchiveDirectory()));
                        } else {
                            f.handle(m -> {});
                        }

                    })
                    .subscribe(f -> f
                        .handle(m -> {

                            // file transfer logging here
                            if(doArchive) {
                              // file archived logging here
                             }
                            }
                        })
                    )
                    )
                )
                .get();
  

Я пытаюсь опубликовать снова после подписчика outboundAdapter, но по ошибке он все еще опубликован.

Также попробуйте добавить .errorChannel в подписчике outboundAdapter, у меня не сработало.

       .   ____          _            __ _ _
 /\ / ___'_ __ _ _(_)_ __  __ _    
( ( )___ | '_ | '_| | '_ / _` |    
 \/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |___, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.2.2.RELEASE)

2020-08-20 11:10:53,265 INFO c.t.i.s.I2SftpApplication - Starting I2SftpApplication on GSEUC5CG8393GR5 with PID 21636 (C:UsersMuhammadUmairIdeaProjectsi2sftpinboudservicetargetclasses started by MuhammadUmair in C:UsersMuhammadUmairIdeaProjectsi2sftpinboudservice)
2020-08-20 11:10:53,269 INFO c.t.i.s.I2SftpApplication - No active profile set, falling back to default profiles: default
2020-08-20 11:10:54,464 INFO o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2020-08-20 11:10:54,478 INFO o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2020-08-20 11:10:54,577 INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-08-20 11:10:54,584 INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-08-20 11:10:54,597 INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-08-20 11:10:54,906 INFO o.s.b.w.e.t.TomcatWebServer - Tomcat initialized with port(s): 8080 (http)
2020-08-20 11:10:54,917 INFO o.a.c.h.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8080"]
2020-08-20 11:10:54,917 INFO o.a.c.c.StandardService - Starting service [Tomcat]
2020-08-20 11:10:54,917 INFO o.a.c.c.StandardEngine - Starting Servlet engine: [Apache Tomcat/9.0.29]
2020-08-20 11:10:55,095 INFO o.a.c.c.C.[.[.[/] - Initializing Spring embedded WebApplicationContext
2020-08-20 11:10:55,095 INFO o.s.w.c.ContextLoader - Root WebApplicationContext: initialization completed in 1705 ms
2020-08-20 11:10:56,920 INFO o.s.s.c.ThreadPoolTaskExecutor - Initializing ExecutorService 'applicationTaskExecutor'
2020-08-20 11:10:57,243 INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2020-08-20 11:10:57,415 INFO o.s.i.e.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-08-20 11:10:57,416 INFO o.s.i.c.PublishSubscribeChannel - Channel 'application.errorChannel' has 1 subscriber(s).
2020-08-20 11:10:57,416 INFO o.s.i.e.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2020-08-20 11:10:57,416 INFO o.s.i.e.EventDrivenConsumer - Adding {message-handler:startup.resultFileHandler.serviceActivator} as a subscriber to the 'fromSftpChannel' channel
2020-08-20 11:10:57,416 INFO o.s.i.c.DirectChannel - Channel 'application.fromSftpChannel' has 1 subscriber(s).
2020-08-20 11:10:57,417 INFO o.s.i.e.EventDrivenConsumer - started bean 'startup.resultFileHandler.serviceActivator'
2020-08-20 11:10:57,422 INFO o.s.i.e.SourcePollingChannelAdapter - started bean 'startup.sftpMessageSource.inboundChannelAdapter'
2020-08-20 11:10:57,441 INFO o.a.c.h.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8080"]
2020-08-20 11:10:57,498 INFO o.s.b.w.e.t.TomcatWebServer - Tomcat started on port(s): 8080 (http) with context path ''
2020-08-20 11:10:57,508 INFO c.t.i.s.I2SftpApplication - Started I2SftpApplication in 9.228 seconds (JVM running for 13.133)
Registering an Integration Flow with id : 1
2020-08-20 11:10:57,689 INFO o.s.i.c.DirectChannel - Channel 'application.1.subFlow#2.channel#0' has 1 subscriber(s).
2020-08-20 11:10:57,690 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
2020-08-20 11:10:57,691 INFO o.s.i.e.EventDrivenConsumer - Adding {bridge} as a subscriber to the '1.subFlow#0.channel#0' channel
2020-08-20 11:10:57,691 INFO o.s.i.c.PublishSubscribeChannel - Channel 'application.1.subFlow#0.channel#0' has 1 subscriber(s).
2020-08-20 11:10:57,691 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-08-20 11:10:57,691 INFO o.s.i.c.DirectChannel - Channel 'application.1.subFlow#1.channel#0' has 1 subscriber(s).
2020-08-20 11:10:57,691 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#1.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
2020-08-20 11:10:57,691 INFO o.s.i.e.EventDrivenConsumer - Adding {bridge} as a subscriber to the '1.subFlow#0.channel#0' channel
2020-08-20 11:10:57,692 INFO o.s.i.c.PublishSubscribeChannel - Channel 'application.1.subFlow#0.channel#0' has 2 subscriber(s).
2020-08-20 11:10:57,692 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#1.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-08-20 11:10:57,693 INFO o.s.i.e.EventDrivenConsumer - Adding {message-handler} as a subscriber to the '1.subFlow#0.channel#1' channel
2020-08-20 11:10:57,694 INFO o.s.i.c.DirectChannel - Channel 'application.1.subFlow#0.channel#1' has 1 subscriber(s).
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#0.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - Adding {bridge} as a subscriber to the '1.subFlow#0.channel#0' channel
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#0.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - Adding {transformer} as a subscriber to the '1.channel#0' channel
2020-08-20 11:10:57,694 INFO o.s.i.c.DirectChannel - Channel 'application.1.channel#0' has 1 subscriber(s).
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - started bean '1.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-08-20 11:10:57,700 INFO o.s.i.e.SourcePollingChannelAdapter - started bean '1.org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0'
2020-08-20 11:10:58,017 INFO c.j.jsch - Connecting to *.*.*.114 port 22
2020-08-20 11:10:58,341 INFO c.j.jsch - Connection established
2020-08-20 11:10:58,672 INFO c.j.jsch - Remote version string: SSH-2.0-OpenSSH_7.4
2020-08-20 11:10:58,673 INFO c.j.jsch - Local version string: SSH-2.0-JSCH-0.1.54
2020-08-20 11:10:58,674 INFO c.j.jsch - CheckCiphers: aes256-ctr,aes192-ctr,aes128-ctr,aes256-cbc,aes192-cbc,aes128-cbc,3des-ctr,arcfour,arcfour128,arcfour256
2020-08-20 11:10:58,728 INFO c.j.jsch - CheckKexes: diffie-hellman-group14-sha1,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521
2020-08-20 11:10:58,815 INFO c.j.jsch - CheckSignatures: ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2020-08-20 11:10:58,823 INFO c.j.jsch - SSH_MSG_KEXINIT sent
2020-08-20 11:10:59,036 INFO c.j.jsch - SSH_MSG_KEXINIT received
2020-08-20 11:10:59,038 INFO c.j.jsch - kex: server: curve25519-sha256,curve25519-sha256@libssh.org,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group-exchange-sha256,diffie-hellman-group16-sha512,diffie-hellman-group18-sha512,diffie-hellman-group-exchange-sha1,diffie-hellman-group14-sha256,diffie-hellman-group14-sha1,diffie-hellman-group1-sha1
2020-08-20 11:10:59,038 INFO c.j.jsch - kex: server: ssh-rsa,rsa-sha2-512,rsa-sha2-256,ecdsa-sha2-nistp256,ssh-ed25519
2020-08-20 11:10:59,039 INFO c.j.jsch - kex: server: aes256-ctr,aes192-ctr,aes128-ctr
2020-08-20 11:10:59,040 INFO c.j.jsch - kex: server: aes256-ctr,aes192-ctr,aes128-ctr
2020-08-20 11:10:59,040 INFO c.j.jsch - kex: server: hmac-sha2-512,hmac-sha2-256
2020-08-20 11:10:59,041 INFO c.j.jsch - kex: server: hmac-sha2-512,hmac-sha2-256
2020-08-20 11:10:59,042 INFO c.j.jsch - kex: server: none,zlib@openssh.com
2020-08-20 11:10:59,042 INFO c.j.jsch - kex: server: none,zlib@openssh.com
2020-08-20 11:10:59,042 INFO c.j.jsch - kex: server: 
2020-08-20 11:10:59,043 INFO c.j.jsch - kex: server: 
2020-08-20 11:10:59,043 INFO c.j.jsch - kex: client: ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group14-sha1,diffie-hellman-group-exchange-sha256,diffie-hellman-group-exchange-sha1,diffie-hellman-group1-sha1
2020-08-20 11:10:59,044 INFO c.j.jsch - kex: client: ssh-rsa,ssh-dss,ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2020-08-20 11:10:59,044 INFO c.j.jsch - kex: client: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-ctr,aes192-cbc,aes256-ctr,aes256-cbc
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-ctr,aes192-cbc,aes256-ctr,aes256-cbc
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: none
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: none
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: 
2020-08-20 11:10:59,046 INFO c.j.jsch - kex: client: 
2020-08-20 11:10:59,046 INFO c.j.jsch - kex: server->client aes128-ctr hmac-sha2-256 none
2020-08-20 11:10:59,047 INFO c.j.jsch - kex: client->server aes128-ctr hmac-sha2-256 none
2020-08-20 11:10:59,053 INFO c.j.jsch - SSH_MSG_KEX_ECDH_INIT sent
2020-08-20 11:10:59,053 INFO c.j.jsch - expecting SSH_MSG_KEX_ECDH_REPLY
2020-08-20 11:10:59,385 INFO c.j.jsch - ssh_rsa_verify: signature true
2020-08-20 11:10:59,389 INFO o.s.i.s.s.DefaultSftpSessionFactory - The authenticity of host '*.*.*.114' can't be established.
RSA key fingerprint is 1f:3e:c9:5f:37:00:a1:00:ef:50:59:af:42:98:99:e9.
Are you sure you want to continue connecting?
2020-08-20 11:10:59,390 WARN c.j.jsch - Permanently added '*.*.*.114' (RSA) to the list of known hosts.
2020-08-20 11:10:59,390 INFO c.j.jsch - SSH_MSG_NEWKEYS sent
2020-08-20 11:10:59,390 INFO c.j.jsch - SSH_MSG_NEWKEYS received
2020-08-20 11:10:59,395 INFO c.j.jsch - SSH_MSG_SERVICE_REQUEST sent
2020-08-20 11:10:59,718 INFO c.j.jsch - SSH_MSG_SERVICE_ACCEPT received
2020-08-20 11:11:00,047 INFO c.j.jsch - Authentications that can continue: gssapi-with-mic,publickey,keyboard-interactive,password
2020-08-20 11:11:00,047 INFO c.j.jsch - Next authentication method: gssapi-with-mic
2020-08-20 11:11:00,399 INFO c.j.jsch - Authentications that can continue: password
2020-08-20 11:11:00,399 INFO c.j.jsch - Next authentication method: password
2020-08-20 11:11:00,746 INFO c.j.jsch - Authentication succeeded (password).
Total Files: 1
2020-08-20 11:11:27,623 INFO c.t.i.s.s.LastModifiedLsEntryFileListFilter - [OB.xml] old size [null]  increased to [0]...
Total Files: 1
2020-08-20 11:11:29,636 INFO c.t.i.s.s.LastModifiedLsEntryFileListFilter - [OB.xml] old size [0]  increased to [38709]...
Total Files: 1
2020-08-20 11:11:34,256 INFO c.j.jsch - Connecting to *.*.*.115 port 22
2020-08-20 11:11:55,282 INFO c.t.i.c.f.a.LoggingAspect - log message Integration Name=CXML invoice DK Service Name=FileTransferService Source=External Source Interface=/home/umair/accruals/14minute/ Target=NA Target Interface=/home/umair/accruals/15minute/ Content ID=OB.xml Message ID=N/A Category=SUCCESS Timestamp=2020-08-20T11:11:55.281 Message=file OB.xml transferred
  

согласно предложению Артема Билана, я создал демонстрационное приложение и загрузил его в репозиторий git здесь

здесь представлены конфигурации сервера

поток интеграции определяется здесь

Файл README

Комментарии:

1. Есть ли шансы увидеть какие-нибудь ваши логи по этому вопросу? Также неясно, о какой части вашей конфигурации вы говорите…

2. @ArtemBilan я столкнулся с проблемой с первым подписчиком (который содержит Sftp.outboundAdapter (outboundSftp)). когда я устанавливаю неправильную конфигурацию сервера для выходного канала, также вызывается подписчик после Sftp.outboundAdapter (outboundSftp)).

3. Хм. Технически это невозможно, поскольку по умолчанию ignoreFailures является false , поэтому из неудачно подписанного выдается исключение. Было бы здорово увидеть некоторые журналы с этой ошибкой. Также: какую версию интеграции Spring вы используете?

4. @ArtemBilan весенняя загрузка 2.2.2.РЕЛИЗ. SFTP для интеграции Spring 5.3.2.РЕЛИЗ. Spring integration ftp 5.3.2.RELEASE

5. Если вы говорите, что во время загрузки произошла ошибка, пожалуйста, поделитесь трассировкой стека в качестве редактирования вашего вопроса. Журналы не читаются в комментариях.

Ответ №1:

Я нашел вашу проблему:

 @Bean
public Advice deleteFileAdvice() {

    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setOnSuccessExpressionString(AppConstants.SUCCESS_EXPRESSION);
    advice.setTrapException(true);
    return advice;
}
  

Посмотрите его JavaDocs:

 /**
 * If true, any exception will be caught and null returned.
 * Default false.
 * @param trapException true to trap Exceptions.
 */
public void setTrapException(boolean trapException) {
  

Таким образом, любые исключения из обработчика сообщений, включая сбой SFTP conneciton, будут проглочены. Следовательно, сообщение о запросе действительно перемещается следующему подписчику в вашей publishSubcribeChannel конфигурации.

Комментарии:

1. Я добавил этот флаг, но он все еще перемещается на следующий канал.

2. Да? Он должен false повторно выдать исключение. Извините, я не уверен в вашей логике.

3. Установлено значение true, как вы предлагаете.

4. есть ли какой-либо альтернативный способ, который я могу использовать?

5. Если это так false , возникает исключение. Смотрите ExpressionEvaluatingRequestHandlerAdvice исходный код. Не могли бы вы, пожалуйста, установить точку останова в ExpressionEvaluatingRequestHandlerAdvice.doInvoke() ot для расследования того, что происходит после того, как это callback.execute() вызвано?