CassandraSink предупреждает о количестве активных сеансов

#cassandra #apache-flink

Вопрос:

Я расследовал предупреждение, выданное com.datastax.oss.driver.internal.core.session.DefaultSession тем, что в моей работе Flink слишком много активных сеансов.

 You have too many session instances: 112 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration)
 

Моя работа имеет 4 CassandraSinks и уровень параллелизма 1 с одним слотом задач.

Я вижу, что в каждом диспетчере задач выполняется один поток, но количество экземпляров сеансов продолжает расти.

Мой кластерный конструктор выглядит так:

 override fun buildCluster(builder: Cluster.Builder): Cluster {
            return builder
                .withCodecRegistry(CodecRegistry.DEFAULT_INSTANCE.register(InstantCodec.instance))
                .withLoadBalancingPolicy(
                    DCAwareRoundRobinPolicy.builder()
                        .withLocalDc(datacenter).build()
                )
                .withQueryOptions(QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
                .withSSL(
                    JdkSSLOptions.builder()
                        .withSSLContext(buildContext(truststorePath)).build()
                )
                .withCredentials(username, password)
                .addContactPointsWithPorts(listOf(InetSocketAddress(host, port)))
                .withReconnectionPolicy(ExponentialReconnectionPolicy(this.baseDelayMs, this.maxDelayMs))
                .withoutMetrics().build()
        }
 

И каждая из раковин выглядит следующим образом:

 val clusterBuilder = getClusterBuilder()
CassandraSink.addSink(streamA).setClusterBuilder(clusterBuilder).build()
CassandraSink.addSink(streamB).setClusterBuilder(clusterBuilder).build()
CassandraSink.addSink(streamC).setClusterBuilder(clusterBuilder).build()
CassandraSink.addSink(streamD).setClusterBuilder(clusterBuilder).build()
 

Есть ли причина, по которой добавляются новые сеансы, а не полагаются на предыдущие созданные? Что-то, что я должен изменить, чтобы ограничить это. Я видел мало информации о Кассандре в Интернете и о рычагах, которые у нас есть. Я понимаю, что сеансы создаются на open RichSinkFunction основе, но если что-то не будет постоянно сбоить и не будет должным образом очищаться, я не могу понять, почему это происходит.

Обновить

Я заметил, что для каждого нового потока (просматривая журналы, вы видите увеличение числа потоков в цепочке операторов) создаются две сессии. Я думаю, стоит добавить, что я использую функции Richasync (ы). Я читаю, может ли это повлиять или нет. У меня есть емкость для асинхронного оператора 100, текущий номер потока 62.