#spring-boot #apache-kafka #spring-kafka
#весенняя загрузка #apache-kafka #spring-kafka
Вопрос:
Мои spring-boot application(consumer)
обрабатывают сообщения от Apache Kafka
. Периодически massage не может обработать и потребитель выдает исключение. Потребитель в любом случае фиксирует смещение. Могу ли я отличить сообщения об успешном завершении от сообщений об ошибках в Kafka? Я думаю, я не могу. Это правда? Если это правда, у меня есть главный вопрос:
Как я могу повторить сообщения об ошибках? Я знаю несколько способов, но я не уверен в их правильности.
1) Измените смещение на раннее. Но таким образом сообщения об успехе тоже будут повторяться.
2) Когда я перехватываю исключение, я отправляю это сообщение в другую тему (например, в тему ошибок). Но это выглядит сложно.
3) Что-то еще (ваш вариант)
Комментарии:
1. Что вы имеете в виду, говоря «Потребитель фиксирует смещение в любом случае».? У вас
spring.kafka.consumer.enable-auto-commit
включено?
Ответ №1:
Если вы хотите получить гарантию хотя бы один раз, общая схема выглядит следующим образом:
- Отключить автоматическую фиксацию (установить
enable.auto.commit
значение false) - Потреблять сообщения
-
Для каждого сообщения:
- Если ошибок нет, зафиксируйте смещение
- Если ошибка, повторите попытку столько раз, сколько пожелаете
- В случае успеха зафиксируйте
- Если вы хотите отказаться, войдите или опубликуйте в очередь ошибок (для анализа или последующего повторения)
-
Повторить
Комментарии:
1. если я это сделаю
retry as many times you wish
, я не смогу обрабатывать другие сообщения, и моя тема увеличится2. Простая повторная попытка не очень полезна. Скорее всего, немедленные повторные попытки также завершатся неудачей (скажем, из-за недоступности внешнего API и т. Д.). Для повторных попыток вам также следует подумать о создании темы повторных попыток.
Ответ №2:
Используйте SeekToCurrentErrorHandler . Он переместит смещение для воспроизведения сообщения (по умолчанию 10 раз, но настраивается).
После того, как повторные попытки будут исчерпаны, он вызывает «средство восстановления», которое может предпринять некоторые действия, такие как DeadLetterPublishingRecoverer .
Комментарии:
1. спасибо @gary-russell Что, если я хочу реализовать какое-то поведение запроса? Например, мое приложение обрабатывает 20 сообщений одновременно, а затем я хочу запросить 1,5 и 7-е сообщения. Должен ли я явно публиковать сообщения в той же теме или я могу каким-то образом подтвердить эти конкретные сообщения?
2. DLPR можно настроить для публикации обратно в ту же очередь. Вы не должны обрабатывать записи одновременно на одном потребителе. Kafka поддерживает только смещение. Используйте разделы для достижения параллелизма.
Ответ №3:
Вы можете внести следующие изменения в конфигурацию вашего пользователя:
-
enable.auto.commit=False
-
RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); fixedBackOffPolicy.setBackOffPeriod(retryInterval); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); retryTemplate.setRetryPolicy(new SimpleRetryPolicy(retryMaxCount)); return retryTemplate; }
-
В вашем
kafkaListenerContainerFactory
:setretryTemplate(retryTemplate); factory.getContainerProperties().setAckOnError(true); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
Теперь в вашем коде просто генерируйте исключение в вашем потребителе всякий раз, когда возникает исключение. Это не обновит смещение при возникновении исключения. И он повторит попытку через retryInterval
время максимум maxRetryCount
.
И если вы хотите игнорировать и не повторять попытки для исключений определенного типа, создайте карту исключений, как показано ниже, и передайте ее, SimpleRetryPolicy()
как в приведенном ниже коде:
Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
exceptionMap.put(IllegalArgumentException.class, false);
exceptionMap.put(TimeoutException.class, true);
Для получения более подробной информации перейдите по этой ссылке: Обработка ошибок Kafka