Исключение при использовании записей из темы

#spring-kafka

#весна-кафка

Вопрос:

Я столкнулся с проблемой при использовании пакета событий от слушателя.

Некоторые свойства поддерживаются с помощью компонента с расширением, а некоторые из них настраиваются из свойств приложения.

 Here is my consumer code :  @KafkaListener(topics = "#{@mybean.getInputTopic()}", groupId = "#{@mybean.getConsumeGroupId()}",  properties = {  "max.poll.interval.ms=#{@mybean.getMaxPollIntervalInMS()}",  "max.poll.records=#{@mybean.getMaxPollRecords()}",  "auto.offset.reset=#{@mybean.getAutoOffsetReset()}",  "bootstrap.servers=#{@mybean.getChangeFeedBootstrapServers()}"})  public void onMessage(  @Payload Listlt;ConsumerRecordlt;String, Stringgt;gt; consumerRecords, Acknowledgment acknowledgment) {     // processing the batch of records    }    

Ошибка ниже продолжает появляться, и я не уверен, что вызывает эту проблему.

ОШИБКА o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer — Исключение потребителя org.springframework.кафка.Исключение KafkaException: Поиск текущего после исключения; вложенное исключение-org.apache.кафка.обычное дело.Исключение KafkaException: Обратный вызов балансировки пользователя выдает ошибку в org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72) в org.springframework.кафка.слушатель.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) в организации.весенняя работа.кафка.слушатель.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1405) в org.springframework.кафка.слушатель.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108) в java.util.concurrent.Исполнители$RunnableAdapter.вызов(Исполнители.java:511) в java.util.concurrent.FutureTask.run(FutureTask.java:266) на java.lang.Thread.run(Thread.java:748) Вызвано: org.apache.кафка.обычное дело.Исключение KafkaException: Обратный вызов балансировки пользователя выдает ошибку в организации.апач.кафка.клиенты.потребители.внутренние компоненты.Координатор потребителей.onJoinComplete(ConsumerCoordinator.java:422) в org.apache.кафка.клиенты.потребители.внутренние компоненты.AbstractCoordinator.Совместная группаобходимо(AbstractCoordinator.java:439) в org.apache.кафка.клиенты.потребители.внутренние компоненты.AbstractCoordinator.ensureактивная группа(AbstractCoordinator.java:358) в org.apache.кафка.клиенты.потребители.внутренние компоненты.ConsumerCoordinator.опрос(ConsumerCoordinator.java:497) в org.apache.кафка.клиенты.потребитель.KafkaConsumer.updateassignmentmetadataесли требуется(KafkaConsumer.java:1274) в организации.апач.кафка.клиенты.потребитель.KafkaConsumer.опрос(KafkaConsumer.java:1236) в org.apache.кафка.клиенты.потребитель.KafkaConsumer.poll(KafkaConsumer.java:1216) в sun.reflect.NativeMethodAccessorImpl.invoke0(Собственный метод) в sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) в sun.reflect.Делегирование methodaccessorimpl.invoke(делегирование methodaccessorimpl.java:43) в java.lang.reflect.Метод.вызов(Метод.java:498) в org.springframework.aop.поддержка.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) в организации.springframework.aop.каркас.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:205) в com.sun.proxy.$Proxy102.опрос(Неизвестный источник) в org.springframework.кафка.слушатель.KafkaMessageListenerContainer$ListenerConsumer.доПолл(KafkaMessageListenerContainer.java:1246) в org.springframework.кафка.слушатель.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1146) в org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059) … пропущено 3 общих кадра, вызванных: org.апач.кафка.распространенные.ошибки.Исключение TimeoutException: Тайм-аут в 60000мс истек до того, как можно было определить позицию раздела my-test-topic-1

 My configuration :     gt; fetch.max.bytes = 10485760 //10 mb fetch.max.wait.ms = 420000 //7 min  gt; fetch.min.bytes = 5242880 // 5mb max.poll.interval.ms = 900000 //15  gt; min max.poll.records = 2500 request.timeout.ms = 320000  gt; receive.buffer.bytes = 10485760 // 10mb  

Комментарии:

1. Похоже на какой-то тайм-аут, пока контейнер пытается определить текущие позиции во время перебалансировки. Какую версию вы используете? setAssignmentCommitOption(AssignmentCommitOption.NEVER) Примерьте свойства контейнера (это должно препятствовать тому, чтобы контейнер пытался получить позицию).

2. Спасибо, Гэри.. я использую spring-boot:2.3.8 . Я никогда раньше не сталкивался с этой проблемой. Только когда я переехал в EXPL, я начал видеть эту проблему. Все значения, какие бы я ни установил, я вижу в конфигурации кафки во время запуска приложения

3. Гэри.. Не могли бы вы, пожалуйста, сообщить мне, как установить параметр AssignmentCommitOption. НИКОГДА в свойствах приложения на уровне контейнера ? например, spring.kafka.listener.ack-mode = MANUAL_IMMEDIATE

4.Вы можете установить его на уровне контейнера; установите id на прослушивателе значение autoStartup false ; затем. в an ApplicationRunner @Bean используйте KafkaListenerEndpointRegistry боб — registry.getListenerContainer(id).getContainerProperties().set... и тогда registry.getListenerContainer(id).start() .

5. Или вы можете добавить a ContainerCustomizer на завод.