We are using spring integration dynamic sftp flows to ingest sftp files . The flow java config looks like below
from(Sftp.inboundAdapter(cachingSessionFactory, (a, b) -> Long.valueOf(a.lastModified())
.compareTo(b.lastModified()))//
.preserveTimestamp(true)//
.remoteDirectory(job.getRemoteDirectory())//
.deleteRemoteFiles(job.getDeleteRemoteFiles())//
.filter(this.compositeRemoteFilter(job))//
.autoCreateLocalDirectory(true)//
.preserveTimestamp(true)//
.maxFetchSize(maxMessagesPerPoll)
.localFilter(new LocalFileFilter(job))//
.localDirectory(localDirectory)),
e -> e.id("testComponent")
.autoStartup(false)//
.poller(Pollers.cron(job.getPollingFreq(), job.timeZone())//
.maxMessagesPerPoll(maxMessagesPerPoll)
.receiveTimeout(1000L)
.handle(UploadHandler)
The caching session factory is something we get dynamically via using a delegate . Most of it works fine but sometimes after running for days we observe some threads stuck in RUNNABLE . Our assumption was if the jsch session was stuck in any way it should eventually come out as we have timeouts both at the session factory level and at the poller .
The dump for the thread looks something like this
java.io.FileInputStream.readBytes(Native Method)java.io.FileInputStream.read(FileInputStream.java:255)
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
java.io.BufferedInputStream.read(BufferedInputStream.java:265)
com.jcraft.jsch.IO.getByte(IO.java:73)
com.jcraft.jsch.Session.connect(Session.java:263)
com.jcraft.jsch.Session.connect(Session.java:183)
org.springframework.integration.sftp.session.SftpSession.connect(SftpSession.java:268)
org.springframework.integration.sftp.session.DefaultSftpSessionFactory.getSession(DefaultSftpSessionFactory.java:390)
custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:44)
custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:15)
org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:84)
org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:81)
org.springframework.integration.util.SimplePool.doGetItem(SimplePool.java:195)
org.springframework.integration.util.SimplePool.getItem(SimplePool.java:176)
org.springframework.integration.file.remote.session.CachingSessionFactory.getSession(CachingSessionFactory.java:135)
custom.integration.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:80)
custom.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:67)
org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:308)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:258)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:64)
org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:45)
org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:160)org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:360)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$1934/1648215776.call(Unknown Source)
org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$2062/2127922639.run(Unknown Source)
org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
org.springframework.integration.util.ErrorHandlingTaskExecutor$$Lambda$2063/1949167295.run(Unknown Source)
org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$1935/1382748208.run(Unknown Source)org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:67)
org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Please help if we are missing anything here or if there is some configuration we can do on the si side to fix this .
SI version 5.1.13
Another heap dump trace of thread
"Name","Retained Size","Shallow Size","Level"
"java.lang.Thread [Thread, Stack Local] ""my-taskScheduler-42"" tid=348 [RUNNABLE]","54768","120","1"
"contextClassLoader org.springframework.boot.loader.LaunchedURLClassLoader [Stack Local]","10324089","80","2"
"<local variable> com.jcraft.jsch.Session [Stack Local]","21232","256","2"
"threadLocals java.lang.ThreadLocal$ThreadLocalMap","15896","24","2"
"<local variable> java.lang.UNIXProcess$ProcessPipeInputStream [Monitor Used, Stack Local]","8264","40","2"
"<local variable> org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource [Stack Local]","4784","96","2"
"<local variable> org.springframework.integration.endpoint.SourcePollingChannelAdapter [Stack Local]","2608","176","2"
"<local variable> java.util.concurrent.ScheduledThreadPoolExecutor [Stack Local]","2392","80","2"
"<local variable> org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer [Stack Local]","2168","64","2"
"<local variable> org.springframework.integration.file.remote.RemoteFileTemplate [Stack Local]","824","64","2"
"<local variable> org.springframework.integration.util.SimplePool [Stack Local]","744","56","2"
"group java.lang.ThreadGroup","656","48","2"
"<local variable> custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","2"
"<local variable> org.springframework.scheduling.concurrent.ReschedulingRunnable [Stack Local]","232","48","2"
"inheritableThreadLocals java.lang.ThreadLocal$ThreadLocalMap","104","24","2"
"inheritedAccessControlContext java.security.AccessControlContext","88","40","2"
"name java.lang.String ""my-taskScheduler-42""","80","24","2"
"<local variable> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask [Stack Local]","72","72","2"
"<local variable> org.springframework.integration.sftp.session.SftpSession [Stack Local]","56","32","2"
"<local variable> java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
"target java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
"<local variable> org.springframework.integration.util.ErrorHandlingTaskExecutor [Stack Local]","40","24","2"
"<local variable> org.springframework.integration.sftp.session.JSchSessionWrapper [Stack Local]","40","24","2"
"<local variable> java.io.FileDescriptor [JNI Local]","32","32","2"
"<local variable> org.springframework.integration.file.remote.session.CachingSessionFactory [Stack Local]","32","32","2"
"pool org.springframework.integration.util.SimplePool [Stack Local]","744","56","3"
"sessionFactory custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","3"
"jsch com.jcraft.jsch.JSch","736","32","4"
"proxy custom.adapters.session.SftpProxyCommand","328","32","4"
"sessionConfig java.util.Properties size = 2","176","48","4"
"sharedSessionLock java.util.concurrent.locks.ReentrantReadWriteLock","120","24","4"
"host java.lang.String ""sftp.server""","80","24","4"
"host java.lang.String ""sftp.server""","80","24","4"
"<class> custom.adapters.session.LogEnabledSftpSessionFactory org.springframework.boot.loader.LaunchedURLClassLoader","64","64","4"
"password java.lang.String ""@#@#@#@#""","64","24","4"
"user java.lang.String ""user""","64","24","4"
"user java.lang.String ""user""","64","24","4"
"enableDaemonThread java.lang.Boolean = false","16","16","4"
"serverAliveCountMax java.lang.Integer = 4 0x00000004","16","16","4"
"serverAliveInterval java.lang.Integer = 240,000 0x0003A980","16","16","4"
"timeout java.lang.Integer = 120,000 0x0001D4C0","16","16","4"
"userInfoWrapper org.springframework.integration.sftp.session.DefaultSftpSessionFactory$UserInfoWrapper&qu