Потоки Кафки не собирают максимальные записи опроса и другие конфигурации

#apache-kafka #kafka-consumer-api #apache-kafka-streams #spring-cloud-stream #spring-cloud-stream-binder-kafka

Вопрос:

У меня есть приложение springboot, основанное на потоках Кафки. У меня есть метод прослушивания, когда входящие записи передаются через KStreams и KTables.

Заметил, что в конфигурациях макс. опросов.записей, max.poll.interval.ms amp; связанные настройки сеанса работают только тогда, когда мы слушаем как подписной канал (потребительская модель), а не в потоках. Как уменьшить максимальное количество записей опроса в контексте KStream ?

Код прослушивателя выглядит следующим образом :

 @Trace(dispatcher = true)
    @StreamListener
    public void process(
            @Input(KafkaMyOrdersStreams.GLPROD_OE_ORDER_HEADERS_ALL_INPUT) KTable<String, OeOrderHeader> oeOrderHeaders,
            @Input(KafkaMyOrdersStreams.GLPROD_OE_ORDER_LINES_INPUT) KStream<String, OeOrderLine> glOrderLines,
            @Input(KafkaMyOrdersStreams.MYORDERS_DETAIL_INPUT) KTable<String, MyOrdersDetailsResponse> myOrdersDetailsResponseKTable) {
 

Выполнение некоторых операций в KStream peek (), которое занимает больше времени до опроса() :

 glOrderLines.peek((key, value) -> {
            long startTime = System.currentTimeMillis();

            executorService.submit(() -> {
                readOrderLines();
                if (!StreamsReferenceObject.isHistoryLoadEnabled)
                    kafkaService.processCoreLogic(value, kafkaStreams);
            });
 

PS : В KStream я выполняю несколько конкретных операций, которые требуют времени, и, следовательно, происходит перебалансировка. Я хотел уменьшить максимальное количество записей опроса, чтобы избежать проблем с балансировкой, так как мой опрос() вызывается через некоторое время