Сбой SeekToCurrentErrorHandler в случае наличия нескольких сбойных записей из разных разделов, если FixedBackOff установлен как FixedBackOff(0L, 1)

#spring-boot #spring-kafka

#spring-boot #spring-кафка

Вопрос:

В версии spring-kafka-2.5.4.RELEASE, когда имеется несколько сбойных записей из разных разделов, SeekToCurrentErrorHandler завершается с ошибкой, если для FixedBackOff установлено значение maxAttempts равное 1 и интервал, отличный от -1L.

SeekToCurrentErrorHandler SeekToCurrentErrorHandler = новый SeekToCurrentErrorHandler(,новый FixedBackOff(0L, 1));

Хотя установка значения для интервала, отличного от -1L, не имеет смысла, когда счетчик maxAttemps равен 1 (поскольку повторных попыток не будет и, следовательно, нет интервала повторных попыток), не должен ли он либо завершаться сбоем при запуске, жалуясь на то же самое, либо должен обрабатываться соответствующим образом?. Сбой во время выполнения при наличии нескольких сбойных записей из разных разделов с ошибкой ниже.

ОШИБКА org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer — обработчик ошибок выдал исключение org.springframework.kafka.Исключение KafkaException: поиск текущего после исключения; вложенным исключением является org.springframework.kafka.listener .Исключение ListenerExecutionFailedException: <здесь какое-то исключение ввода-вывода, ни одно из них не определено в FailedRecordProcessor.configureDefaultClassifier()> в org.springframework.kafka.listener .SeekUtils .seekOrRecover(SeekUtils.java:157)

Похоже, это относится к приведенной ниже строке.

Строка 96 FailedRecordTracker(т.е. Если (nextBackOff != BackOffExecution.STOP) { )

https://github.com/spring-projects/spring-kafka/blob/v2.5.4.RELEASE/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java#L96

что впоследствии приводит к вводу в строку 157 SeekUtils(т.е. выбрасывает новое исключение KafkaException(«Искать текущее после исключения», level, thrownException);)

https://github.com/spring-projects/spring-kafka/blob/v2.5.4.RELEASE/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java#L157

Ответ №1:

Возможно, вы выполняете миграцию с более старой версии.

maxAttempts in FixedBackOff означает максимальное количество попыток повторения, поэтому должно быть 0 для отсутствия попыток.

См. https://docs.spring.io/spring-kafka/docs/2.5.10.RELEASE/reference/html/#seek-to-current

Начиная с версии 2.3, a BackOff может быть предоставлен SeekToCurrentErrorHandler и DefaultAfterRollbackProcessor , чтобы поток-потребитель мог спать в течение некоторого настраиваемого времени между попытками доставки. Spring Framework предоставляет два готовых BackOff s, FixedBackOff и ExponentialBackOff . Максимальное время отката не должно превышать max.poll.interval.ms потребительское свойство, чтобы избежать перебалансировки.

ВАЖНО: ранее конфигурация была «maxFailures» (которая включала первую попытку доставки). При использовании a FixedBackOff его maxAttempts свойство представляет количество попыток доставки (на одну меньше, чем у старого maxFailures свойства). Кроме того, maxFailures=-1 имеется в виду повторная попытка на неопределенный срок со старой конфигурацией, с отсрочкой вы бы установили значение maxAttempts Long.MAX_VALUE для a FixedBackOff и оставили значение maxElapsedTime по умолчанию в ExponentialBackOff .