#cassandra #apache-flink
Вопрос:
Я расследовал предупреждение, выданное com.datastax.oss.driver.internal.core.session.DefaultSession
тем, что в моей работе Flink слишком много активных сеансов.
You have too many session instances: 112 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration)
Моя работа имеет 4 CassandraSinks
и уровень параллелизма 1 с одним слотом задач.
Я вижу, что в каждом диспетчере задач выполняется один поток, но количество экземпляров сеансов продолжает расти.
Мой кластерный конструктор выглядит так:
override fun buildCluster(builder: Cluster.Builder): Cluster {
return builder
.withCodecRegistry(CodecRegistry.DEFAULT_INSTANCE.register(InstantCodec.instance))
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy.builder()
.withLocalDc(datacenter).build()
)
.withQueryOptions(QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
.withSSL(
JdkSSLOptions.builder()
.withSSLContext(buildContext(truststorePath)).build()
)
.withCredentials(username, password)
.addContactPointsWithPorts(listOf(InetSocketAddress(host, port)))
.withReconnectionPolicy(ExponentialReconnectionPolicy(this.baseDelayMs, this.maxDelayMs))
.withoutMetrics().build()
}
И каждая из раковин выглядит следующим образом:
val clusterBuilder = getClusterBuilder()
CassandraSink.addSink(streamA).setClusterBuilder(clusterBuilder).build()
CassandraSink.addSink(streamB).setClusterBuilder(clusterBuilder).build()
CassandraSink.addSink(streamC).setClusterBuilder(clusterBuilder).build()
CassandraSink.addSink(streamD).setClusterBuilder(clusterBuilder).build()
Есть ли причина, по которой добавляются новые сеансы, а не полагаются на предыдущие созданные? Что-то, что я должен изменить, чтобы ограничить это. Я видел мало информации о Кассандре в Интернете и о рычагах, которые у нас есть. Я понимаю, что сеансы создаются на open
RichSinkFunction
основе, но если что-то не будет постоянно сбоить и не будет должным образом очищаться, я не могу понять, почему это происходит.
Обновить
Я заметил, что для каждого нового потока (просматривая журналы, вы видите увеличение числа потоков в цепочке операторов) создаются две сессии. Я думаю, стоит добавить, что я использую функции Richasync (ы). Я читаю, может ли это повлиять или нет. У меня есть емкость для асинхронного оператора 100, текущий номер потока 62.