Вызвано: com.jcraft.jsch.JSchException: соединение закрыто иностранным хостом при использовании DefaultSftpSessionFactory и SftpInboundFileSynchronizer

#spring #spring-batch #spring-integration

#spring #spring-batch #spring-интеграция

Вопрос:

Я использую пакетное удаленное разделение spring для чтения входных файлов и их обработки. Чтобы сделать входные файлы доступными на всех серверах, я добавил step listener, который проверяет, существуют они или нет, и загружает их с master.

 <step id="importExchangesStep">
    <tasklet transaction-manager="transactionManager">
        <chunk reader="importExchangesFileItemReader" writer="importExchangesItemWriter"
            commit-interval="${import.exchanges.commit.interval}" />
        <listeners>
            <listener ref="ftpGetRemoteExchangesFilesListener" />
        </listeners>
    </tasklet>
</step>

<job id="importExchangesJob" restartable="true">
    <step id="importExchangesStep.master">
        <partition partitioner="importExchangesPartitioner"
            handler="importExchangesPartitionHandler" />
    </step>
</job>
  

Я использую DefaultSftpSessionFactory для загрузки файлов с главных серверов на подчиненные серверы. Существует 4 сервера, а параллелизм потребителей равен 7 на каждом, так что всего 28 разделов (пошаговое выполнение) выполняются параллельно.

Ниже приведена конфигурация sftp ,

 <beans:bean id="sftpSessionFactory"
    class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <beans:property name="host" value="${master.host}" />
    <beans:property name="user" value="${master.user}" />
    <beans:property name="password" value="${master.password}" />
    <beans:property name="port" value="22"/>
</beans:bean>
  

Он работает, если он запускает его только на одном сервере. Но если я распределил его на 4, какой-то раздел будет успешно завершен, а какой-то завершится неудачно с этим исключением

 Caused by: com.jcraft.jsch.JSchException: connection is closed by foreign host
  

Конфигурация прослушивателя:

 <beans:bean id="ftpGetRemoteExchangesFilesListener"
    class="com.st.batch.listeners.FtpGetRemoteFilesListener"
    p:sessionFactory-ref="sftpSessionFactory" p:downloadFileAttempts="3"
    p:fileNamePattern="*.txt" p:deleteLocalFiles="false"
    p:localDirectory="/tmp/spring/batch/#{jobParameters[batch_id]}/exchanges/"
    p:remoteDirectory="/tmp/spring/batch/#{jobParameters[batch_id]}/exchanges/"
    scope="step" />
  

Класс слушателя без геттеров и сеттеров, ссылающихся на это

http://coreyreil.wordpress.com/2012/12/21/spring-batch-creating-an-ftp-tasklet-to-get-remote-files/

 public class FtpGetRemoteFilesListener extends StepExecutionListenerSupport implements InitializingBean
{
    //private Logger logger = LoggerFactory.getLogger(FtpGetRemoteFilesTasklet.class);
    private static Log logger = LogFactory.getLog(FtpGetRemoteFilesListener.class);

    private File localDirectory;

    private AbstractInboundFileSynchronizer<?> ftpInboundFileSynchronizer;

    private SessionFactory sessionFactory;

    private boolean autoCreateLocalDirectory = true;

    private boolean deleteLocalFiles = true;

    private String fileNamePattern;

    private String remoteDirectory;

    private int downloadFileAttempts = 12;

    private long retryIntervalMilliseconds = 300000;

    private boolean retryIfNotFound = false;


    /* (non-Javadoc)
     * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
     */
    public void afterPropertiesSet() throws Exception
    {
        Assert.notNull(sessionFactory, "sessionFactory attribute cannot be null");
        Assert.notNull(localDirectory, "localDirectory attribute cannot be null");
        Assert.notNull(remoteDirectory, "remoteDirectory attribute cannot be null");
        Assert.notNull(fileNamePattern, "fileNamePattern attribute cannot be null");

        setupFileSynchronizer();

        if (!this.localDirectory.exists())
        {
            if (this.autoCreateLocalDirectory)
            {
                if (logger.isDebugEnabled())
                {
                    logger.debug("The '"   this.localDirectory   "' directory doesn't exist; Will create.");
                }
                this.localDirectory.mkdirs();
            }
            else
            {
                throw new FileNotFoundException(this.localDirectory.getName());
            }
        }
    }

    private void setupFileSynchronizer()
    {
        if (isSftp())
        {
            ftpInboundFileSynchronizer = new SftpInboundFileSynchronizer(sessionFactory);
            ((SftpInboundFileSynchronizer) ftpInboundFileSynchronizer).setFilter(new SftpSimplePatternFileListFilter(fileNamePattern));
        }
        else
        {
            ftpInboundFileSynchronizer = new FtpInboundFileSynchronizer(sessionFactory);
            ((FtpInboundFileSynchronizer) ftpInboundFileSynchronizer).setFilter(new FtpSimplePatternFileListFilter(fileNamePattern));
        }
        ftpInboundFileSynchronizer.setRemoteDirectory(remoteDirectory);
    }

    private void deleteLocalFiles()
    {
        if (deleteLocalFiles)
        {
            SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
            List<File> matchingFiles = filter.filterFiles(localDirectory.listFiles());
            if (CollectionUtils.isNotEmpty(matchingFiles))
            {
                for (File file : matchingFiles)
                {
                    FileUtils.deleteQuietly(file);
                }
            }
        }
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {

        deleteLocalFiles();

        ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);

        if (retryIfNotFound)
        {
            SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
            int attemptCount = 1;
            while (filter.filterFiles(localDirectory.listFiles()).size() == 0 amp;amp; attemptCount <= downloadFileAttempts)
            {
                logger.info("File(s) matching "   fileNamePattern   " not found on remote site.  Attempt "   attemptCount   " out of "   downloadFileAttempts);
                try {
                    Thread.sleep(retryIntervalMilliseconds);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);
                attemptCount  ;
            }

            if (attemptCount >= downloadFileAttempts amp;amp; filter.filterFiles(localDirectory.listFiles()).size() == 0)
            {
                try {
                    throw new FileNotFoundException("Could not find remote file(s) matching "   fileNamePattern   " after "   downloadFileAttempts   " attempts.");
                } catch (FileNotFoundException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

    }
}
  

Журнал:

 12:28:47,430 ERROR SimpleAsyncTaskExecutor-3 step.AbstractStep:225 - Encountered an error executing step importExchangesStep in job importExchangesJob
java.lang.IllegalStateException: failed to create SFTP Session
    at org.springframework.integration.sftp.session.DefaultSftpSessionFactory.getSession(DefaultSftpSessionFactory.java:266)
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:143)
    at com.st.batch.listeners.FtpGetRemoteFilesListener.beforeStep(FtpGetRemoteFilesListener.java:121)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132)
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
    at $Proxy29.beforeStep(Unknown Source)
    at org.springframework.batch.core.listener.CompositeStepExecutionListener.beforeStep(CompositeStepExecutionListener.java:77)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:194)
    at org.springframework.batch.integration.partition.StepExecutionRequestHandler.handle(StepExecutionRequestHandler.java:64)
    at sun.reflect.GeneratedMethodAccessor121.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:69)
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:97)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:81)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:102)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:103)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:126)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:227)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:127)
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:73)
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:67)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    at org.springframework.integration.core.MessagingTemplate.doSendAndReceive(MessagingTemplate.java:318)
    at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:239)
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:233)
    at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:207)
    at org.springframework.integration.amqp.inbound.AmqpInboundGateway.access$200(AmqpInboundGateway.java:47)
    at org.springframework.integration.amqp.inbound.AmqpInboundGateway$1.onMessage(AmqpInboundGateway.java:87)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:693)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:586)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:75)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:154)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1113)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:559)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:904)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:888)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$500(SimpleMessageListenerContainer.java:75)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:989)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalStateException: failed to connect
    at org.springframework.integration.sftp.session.SftpSession.connect(SftpSession.java:204)
    at org.springframework.integration.sftp.session.DefaultSftpSessionFactory.getSession(DefaultSftpSessionFactory.java:262)
    ... 54 more
Caused by: com.jcraft.jsch.JSchException: connection is closed by foreign host
    at com.jcraft.jsch.Session.connect(Session.java:244)
    at com.jcraft.jsch.Session.connect(Session.java:158)
    at org.springframework.integration.sftp.session.SftpSession.connect(SftpSession.java:196)
    ... 55 more
  

Существует ли какое-либо ограничение на количество одновременных подключений, поскольку 28 разделов должны пытаться подключиться, некоторые из которых завершаются успешно, а некоторые завершаются неудачно на всех серверах или что-то еще?

Я могу войти в систему через командную строку со всех серверов для управления с помощью sftp user@host.

Комментарии:

1. Разве это не ограничение параллельного сеанса для вашего SFTP-сервера?

2. вы пробовали выполнять код beforeStep() как отдельный шаг?

3. Мне нужно извлекать входные файлы на всех серверах, поэтому этот шаг необходимо выполнить на всех серверах, поэтому он должен быть прослушивателем удаленного шага, чтобы данные были подготовлены до запуска удаленного шага.

Ответ №1:

Похоже, что ваш SSH-сервер имеет некоторое ограничение на количество одновременных подключений с сервера. Также похоже, что вы используете Spring Integration 2.2.x (обозначено трассировкой стека).

Spring Integration 2.2.x использует соединение для каждого сеанса.

в версии 3.0 введена концепция общих сеансов, где каждый «сеанс» представляет собой канал, мультиплексируемый по одному общему соединению / сеансу.

Добавьте <constructor-arg value="true"/> , чтобы включить эту функцию.

Если вы не можете перейти на 3.0, вам придется посмотреть на настройку сервера, чтобы разрешить больше подключений.