#apache-kafka #kafka-consumer-api #apache-kafka-streams #spring-cloud-stream #spring-cloud-stream-binder-kafka
Вопрос:
У меня есть приложение springboot, основанное на потоках Кафки. У меня есть метод прослушивания, когда входящие записи передаются через KStreams и KTables.
Заметил, что в конфигурациях макс. опросов.записей, max.poll.interval.ms amp; связанные настройки сеанса работают только тогда, когда мы слушаем как подписной канал (потребительская модель), а не в потоках. Как уменьшить максимальное количество записей опроса в контексте KStream ?
Код прослушивателя выглядит следующим образом :
@Trace(dispatcher = true)
@StreamListener
public void process(
@Input(KafkaMyOrdersStreams.GLPROD_OE_ORDER_HEADERS_ALL_INPUT) KTable<String, OeOrderHeader> oeOrderHeaders,
@Input(KafkaMyOrdersStreams.GLPROD_OE_ORDER_LINES_INPUT) KStream<String, OeOrderLine> glOrderLines,
@Input(KafkaMyOrdersStreams.MYORDERS_DETAIL_INPUT) KTable<String, MyOrdersDetailsResponse> myOrdersDetailsResponseKTable) {
Выполнение некоторых операций в KStream peek (), которое занимает больше времени до опроса() :
glOrderLines.peek((key, value) -> {
long startTime = System.currentTimeMillis();
executorService.submit(() -> {
readOrderLines();
if (!StreamsReferenceObject.isHistoryLoadEnabled)
kafkaService.processCoreLogic(value, kafkaStreams);
});
PS : В KStream я выполняю несколько конкретных операций, которые требуют времени, и, следовательно, происходит перебалансировка. Я хотел уменьшить максимальное количество записей опроса, чтобы избежать проблем с балансировкой, так как мой опрос() вызывается через некоторое время