#spring #session #spring-integration
#spring #сессия #spring-интеграция
Вопрос:
Мы используем динамические потоки sftp spring Integration для приема файлов sftp. Конфигурация flow java выглядит следующим образом
from(Sftp.inboundAdapter(cachingSessionFactory, (a, b) -> Long.valueOf(a.lastModified())
.compareTo(b.lastModified()))//
.preserveTimestamp(true)//
.remoteDirectory(job.getRemoteDirectory())//
.deleteRemoteFiles(job.getDeleteRemoteFiles())//
.filter(this.compositeRemoteFilter(job))//
.autoCreateLocalDirectory(true)//
.preserveTimestamp(true)//
.maxFetchSize(maxMessagesPerPoll)
.localFilter(new LocalFileFilter(job))//
.localDirectory(localDirectory)),
e -> e.id("testComponent")
.autoStartup(false)//
.poller(Pollers.cron(job.getPollingFreq(), job.timeZone())//
.maxMessagesPerPoll(maxMessagesPerPoll)
.receiveTimeout(1000L)
.handle(UploadHandler)
Фабрика сеансов кэширования — это то, что мы получаем динамически с помощью делегата. Большая часть из них работает нормально, но иногда после работы в течение нескольких дней мы наблюдаем, что некоторые потоки застряли в RUNNABLE. Мы предполагали, что если сеанс jsch каким-либо образом застрял, он должен в конечном итоге выйти, поскольку у нас есть тайм-ауты как на уровне фабрики сеанса, так и на опроснике .
Дамп для потока выглядит примерно так
java.io.FileInputStream.readBytes(Native Method)java.io.FileInputStream.read(FileInputStream.java:255)
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
java.io.BufferedInputStream.read(BufferedInputStream.java:265)
com.jcraft.jsch.IO.getByte(IO.java:73)
com.jcraft.jsch.Session.connect(Session.java:263)
com.jcraft.jsch.Session.connect(Session.java:183)
org.springframework.integration.sftp.session.SftpSession.connect(SftpSession.java:268)
org.springframework.integration.sftp.session.DefaultSftpSessionFactory.getSession(DefaultSftpSessionFactory.java:390)
custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:44)
custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:15)
org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:84)
org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:81)
org.springframework.integration.util.SimplePool.doGetItem(SimplePool.java:195)
org.springframework.integration.util.SimplePool.getItem(SimplePool.java:176)
org.springframework.integration.file.remote.session.CachingSessionFactory.getSession(CachingSessionFactory.java:135)
custom.integration.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:80)
custom.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:67)
org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:308)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:258)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:64)
org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:45)
org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:160)org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:360)
org.springframework.integration.endpoint.AbstractPollingEndpoint$Lambda$1934/1648215776.call(Unknown Source)
org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
org.springframework.integration.endpoint.AbstractPollingEndpoint$Lambda$2062/2127922639.run(Unknown Source)
org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
org.springframework.integration.util.ErrorHandlingTaskExecutor$Lambda$2063/1949167295.run(Unknown Source)
org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
org.springframework.integration.endpoint.AbstractPollingEndpoint$Lambda$1935/1382748208.run(Unknown Source)org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:67)
org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Пожалуйста, помогите, если мы здесь чего-то не хватает или если есть какая-то конфигурация, которую мы можем выполнить на стороне si, чтобы исправить это.
Версия SI 5.1.13
Еще одна трассировка дампа кучи потока
"Name","Retained Size","Shallow Size","Level"
"java.lang.Thread [Thread, Stack Local] ""my-taskScheduler-42"" tid=348 [RUNNABLE]","54768","120","1"
"contextClassLoader org.springframework.boot.loader.LaunchedURLClassLoader [Stack Local]","10324089","80","2"
"<local variable> com.jcraft.jsch.Session [Stack Local]","21232","256","2"
"threadLocals java.lang.ThreadLocal$ThreadLocalMap","15896","24","2"
"<local variable> java.lang.UNIXProcess$ProcessPipeInputStream [Monitor Used, Stack Local]","8264","40","2"
"<local variable> org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource [Stack Local]","4784","96","2"
"<local variable> org.springframework.integration.endpoint.SourcePollingChannelAdapter [Stack Local]","2608","176","2"
"<local variable> java.util.concurrent.ScheduledThreadPoolExecutor [Stack Local]","2392","80","2"
"<local variable> org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer [Stack Local]","2168","64","2"
"<local variable> org.springframework.integration.file.remote.RemoteFileTemplate [Stack Local]","824","64","2"
"<local variable> org.springframework.integration.util.SimplePool [Stack Local]","744","56","2"
"group java.lang.ThreadGroup","656","48","2"
"<local variable> custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","2"
"<local variable> org.springframework.scheduling.concurrent.ReschedulingRunnable [Stack Local]","232","48","2"
"inheritableThreadLocals java.lang.ThreadLocal$ThreadLocalMap","104","24","2"
"inheritedAccessControlContext java.security.AccessControlContext","88","40","2"
"name java.lang.String ""my-taskScheduler-42""","80","24","2"
"<local variable> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask [Stack Local]","72","72","2"
"<local variable> org.springframework.integration.sftp.session.SftpSession [Stack Local]","56","32","2"
"<local variable> java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
"target java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
"<local variable> org.springframework.integration.util.ErrorHandlingTaskExecutor [Stack Local]","40","24","2"
"<local variable> org.springframework.integration.sftp.session.JSchSessionWrapper [Stack Local]","40","24","2"
"<local variable> java.io.FileDescriptor [JNI Local]","32","32","2"
"<local variable> org.springframework.integration.file.remote.session.CachingSessionFactory [Stack Local]","32","32","2"
"pool org.springframework.integration.util.SimplePool [Stack Local]","744","56","3"
"sessionFactory custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","3"
"jsch com.jcraft.jsch.JSch","736","32","4"
"proxy custom.adapters.session.SftpProxyCommand","328","32","4"
"sessionConfig java.util.Properties size = 2","176","48","4"
"sharedSessionLock java.util.concurrent.locks.ReentrantReadWriteLock","120","24","4"
"host java.lang.String ""sftp.server""","80","24","4"
"host java.lang.String ""sftp.server""","80","24","4"
"<class> custom.adapters.session.LogEnabledSftpSessionFactory org.springframework.boot.loader.LaunchedURLClassLoader","64","64","4"
"password java.lang.String ""@#@#@#@#""","64","24","4"
"user java.lang.String ""user""","64","24","4"
"user java.lang.String ""user""","64","24","4"
"enableDaemonThread java.lang.Boolean = false","16","16","4"
"serverAliveCountMax java.lang.Integer = 4 0x00000004","16","16","4"
"serverAliveInterval java.lang.Integer = 240,000 0x0003A980","16","16","4"
"timeout java.lang.Integer = 120,000 0x0001D4C0","16","16","4"
"userInfoWrapper org.springframework.integration.sftp.session.DefaultSftpSessionFactory$UserInfoWrapper","16","16","4"
"allowUnknownKeys = boolean false","","1","4"
"isSharedSession = boolean false","","1","4"
"port = int 22 0x00000016","","4","4"
"port = int 22 0x00000016","","4","4"
"<class> org.springframework.integration.file.remote.session.CachingSessionFactory org.springframework.boot.loader.LaunchedURLClassLoader","96","72","3"
"<loader> org.springframework.boot.loader.LaunchedURLClassLoader [Stack Local]","10324089","80","4"
"<protection domain> java.security.ProtectionDomain","400","40","4"
"logger org.apache.commons.logging.LogAdapter$Slf4jLocationAwareLog","24","24","4"
"isSharedSessionCapable = boolean true","","1","3"
"sharedSessionEpoch = long 0","","8","3"
"testSession = boolean true","","1","3"
"<local variable> java.util.concurrent.Executors$RunnableAdapter [Stack Local]","24","24","2"
"<local variable> java.util.Date [Stack Local] = 2021-01-19 20:30:17.000","24","24","2"
"blockerLock java.lang.Object","16","16","2"
"daemon = boolean false","","1","2"
"eetop = long 28,082,176 0x0000000001AC8000","","8","2"
"nativeParkEventPointer = long 140,660,716,930,496 0x00007FEE201105C0","","8","2"
"priority = int 5 0x00000005","","4","2"
"single_step = boolean false","","1","2"
"stackSize = long 0","","8","2"
"stillborn = boolean false","","1","2"
"threadLocalRandomProbe = int -884,406,543 0xCB4906F1","","4","2"
"threadLocalRandomSecondarySeed = int 0","","4","2"
"threadLocalRandomSeed = long -7,128,783,728,802,150,278 0x9D1178F7F429C87A","","8","2"
"threadStatus = int 5 0x00000005","","4","2"
"tid = long 348 0x000000000000015C","","8","2"
Пользовательский код прокси для туннелирования
public class SftpProxyCommand implements Proxy
{
String command;
Process p = null;
InputStream in = null;
OutputStream out = null;
public SftpProxyCommand(String appUser, String privateKeyLocation, String jumpHost)
{
this.command = on(" ").join("ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i",
privateKeyLocation, "-l", appUser, jumpHost, "nc %h %p");
}
public void connect(SocketFactory socket_factory, String host, int port, int timeout) throws Exception
{
String _command = command.replace("%h", host);
_command = _command.replace("%p", new Integer(port).toString());
p = Runtime.getRuntime().exec(_command);
LOG.debug("Sftp Command : {}", _command);
in = p.getInputStream();
out = p.getOutputStream();
}
public Socket getSocket()
{
return null;
}
public InputStream getInputStream()
{
return in;
}
public OutputStream getOutputStream()
{
return out;
}
public void close()
{
try
{
if (p != null)
{
p.getErrorStream().close();
p.getOutputStream().close();
p.getInputStream().close();
p.destroy();
p = null;
}
}
catch (IOException e)
{
LOG.error("Issue in closing sftp command", e);
}
}
}
Комментарии:
1. Есть ли шансы попробовать ваше решение с последней версией Spring Integration: spring.io/projects/spring-integration#learn ? Если я правильно помню, мы внесли некоторые улучшения в фабрику сеансов кэширования. С другой стороны, вы можете попробовать пока без кэширования…
2. Ах, хорошо @ArtemBilan мы обновились до 5.1.13 и подумали github.com/spring-projects/spring-integration/commit /… исправил это. Но попробую, если пока работает простая фабрика.
3. Можете ли вы опубликовать полный дамп потока где-нибудь (github gist, pastebin и т. Д.). В этом дампе потока есть что-то очень странное.
FileInputStream
подразумевает, что connect пытается прочитатьFile
. Это должно бытьjava.net.Socket$SocketInputStream
. Независимо от того, (пока он действительно пытается читать из сокета), сокеты по умолчанию не имеют тайм-аута; вы можете установить тайм-аутDefaultSftpSessionFactory
, который должен привести к сбою этой попытки чтения после этого тайм-аута.4. @GaryRussell добавление трассировки дампа кучи для потока, но мы уже добавили несколько тайм-аутов на завод и перерабатываем его через несколько часов
CachingSessionFactory<LsEntry> csf = new CachingSessionFactory<LsEntry>(sf); csf.setPoolSize(connectionInfo.getPoolSize()); csf.setTestSession(true); csf.setSessionWaitTimeout(props.getSftp() .getSessionWaitTimeout());
. sf по умолчанию выглядит такsf.setServerAliveCountMax(5); sf.setServerAliveInterval120_000)
5. Они находятся в том же пакете,
Proxy
что и классProxyHttp
,ProxySOCKS4
иProxySOCKS5
. Вы не устанавливаетеsoTimeout
в своем.in = p.getInputStream();
— это не сокет, вы блокируете стандартный интерфейс.
Ответ №1:
Ваш прокси-сервер блокирует стандартный интерфейс.
Комментарии:
1. Спасибо @GaryRussel за указание на это, хотя у нас есть прокси-сервер jump host , нам нужно будет выполнить туннелирование ssh, чтобы заставить его работать в нашей среде в отличие от этих реализаций. Но давайте проверим нашу сторону, что мы можем с этим поделать.