#spring-kafka
#весна-кафка
Вопрос:
Я столкнулся с проблемой при использовании пакета событий от слушателя.
Некоторые свойства поддерживаются с помощью компонента с расширением, а некоторые из них настраиваются из свойств приложения.
Here is my consumer code : @KafkaListener(topics = "#{@mybean.getInputTopic()}", groupId = "#{@mybean.getConsumeGroupId()}", properties = { "max.poll.interval.ms=#{@mybean.getMaxPollIntervalInMS()}", "max.poll.records=#{@mybean.getMaxPollRecords()}", "auto.offset.reset=#{@mybean.getAutoOffsetReset()}", "bootstrap.servers=#{@mybean.getChangeFeedBootstrapServers()}"}) public void onMessage( @Payload Listlt;ConsumerRecordlt;String, Stringgt;gt; consumerRecords, Acknowledgment acknowledgment) { // processing the batch of records }
Ошибка ниже продолжает появляться, и я не уверен, что вызывает эту проблему.
ОШИБКА o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer — Исключение потребителя org.springframework.кафка.Исключение KafkaException: Поиск текущего после исключения; вложенное исключение-org.apache.кафка.обычное дело.Исключение KafkaException: Обратный вызов балансировки пользователя выдает ошибку в org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72) в org.springframework.кафка.слушатель.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) в организации.весенняя работа.кафка.слушатель.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1405) в org.springframework.кафка.слушатель.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108) в java.util.concurrent.Исполнители$RunnableAdapter.вызов(Исполнители.java:511) в java.util.concurrent.FutureTask.run(FutureTask.java:266) на java.lang.Thread.run(Thread.java:748) Вызвано: org.apache.кафка.обычное дело.Исключение KafkaException: Обратный вызов балансировки пользователя выдает ошибку в организации.апач.кафка.клиенты.потребители.внутренние компоненты.Координатор потребителей.onJoinComplete(ConsumerCoordinator.java:422) в org.apache.кафка.клиенты.потребители.внутренние компоненты.AbstractCoordinator.Совместная группаобходимо(AbstractCoordinator.java:439) в org.apache.кафка.клиенты.потребители.внутренние компоненты.AbstractCoordinator.ensureактивная группа(AbstractCoordinator.java:358) в org.apache.кафка.клиенты.потребители.внутренние компоненты.ConsumerCoordinator.опрос(ConsumerCoordinator.java:497) в org.apache.кафка.клиенты.потребитель.KafkaConsumer.updateassignmentmetadataесли требуется(KafkaConsumer.java:1274) в организации.апач.кафка.клиенты.потребитель.KafkaConsumer.опрос(KafkaConsumer.java:1236) в org.apache.кафка.клиенты.потребитель.KafkaConsumer.poll(KafkaConsumer.java:1216) в sun.reflect.NativeMethodAccessorImpl.invoke0(Собственный метод) в sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) в sun.reflect.Делегирование methodaccessorimpl.invoke(делегирование methodaccessorimpl.java:43) в java.lang.reflect.Метод.вызов(Метод.java:498) в org.springframework.aop.поддержка.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) в организации.springframework.aop.каркас.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:205) в com.sun.proxy.$Proxy102.опрос(Неизвестный источник) в org.springframework.кафка.слушатель.KafkaMessageListenerContainer$ListenerConsumer.доПолл(KafkaMessageListenerContainer.java:1246) в org.springframework.кафка.слушатель.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1146) в org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059) … пропущено 3 общих кадра, вызванных: org.апач.кафка.распространенные.ошибки.Исключение TimeoutException: Тайм-аут в 60000мс истек до того, как можно было определить позицию раздела my-test-topic-1
My configuration : gt; fetch.max.bytes = 10485760 //10 mb fetch.max.wait.ms = 420000 //7 min gt; fetch.min.bytes = 5242880 // 5mb max.poll.interval.ms = 900000 //15 gt; min max.poll.records = 2500 request.timeout.ms = 320000 gt; receive.buffer.bytes = 10485760 // 10mb
Комментарии:
1. Похоже на какой-то тайм-аут, пока контейнер пытается определить текущие позиции во время перебалансировки. Какую версию вы используете?
setAssignmentCommitOption(AssignmentCommitOption.NEVER)
Примерьте свойства контейнера (это должно препятствовать тому, чтобы контейнер пытался получить позицию).2. Спасибо, Гэри.. я использую spring-boot:2.3.8 . Я никогда раньше не сталкивался с этой проблемой. Только когда я переехал в EXPL, я начал видеть эту проблему. Все значения, какие бы я ни установил, я вижу в конфигурации кафки во время запуска приложения
3. Гэри.. Не могли бы вы, пожалуйста, сообщить мне, как установить параметр AssignmentCommitOption. НИКОГДА в свойствах приложения на уровне контейнера ? например, spring.kafka.listener.ack-mode = MANUAL_IMMEDIATE
4.Вы можете установить его на уровне контейнера; установите
id
на прослушивателе значениеautoStartup
false
; затем. в anApplicationRunner
@Bean
используйтеKafkaListenerEndpointRegistry
боб —registry.getListenerContainer(id).getContainerProperties().set...
и тогдаregistry.getListenerContainer(id).start()
.5. Или вы можете добавить a
ContainerCustomizer
на завод.