Как я могу повторить сообщения об ошибках из kafka?

#spring-boot #apache-kafka #spring-kafka

#весенняя загрузка #apache-kafka #spring-kafka

Вопрос:

Мои spring-boot application(consumer) обрабатывают сообщения от Apache Kafka . Периодически massage не может обработать и потребитель выдает исключение. Потребитель в любом случае фиксирует смещение. Могу ли я отличить сообщения об успешном завершении от сообщений об ошибках в Kafka? Я думаю, я не могу. Это правда? Если это правда, у меня есть главный вопрос:

Как я могу повторить сообщения об ошибках? Я знаю несколько способов, но я не уверен в их правильности.

1) Измените смещение на раннее. Но таким образом сообщения об успехе тоже будут повторяться.

2) Когда я перехватываю исключение, я отправляю это сообщение в другую тему (например, в тему ошибок). Но это выглядит сложно.

3) Что-то еще (ваш вариант)

Комментарии:

1. Что вы имеете в виду, говоря «Потребитель фиксирует смещение в любом случае».? У вас spring.kafka.consumer.enable-auto-commit включено?

Ответ №1:

Если вы хотите получить гарантию хотя бы один раз, общая схема выглядит следующим образом:

  • Отключить автоматическую фиксацию (установить enable.auto.commit значение false)
  • Потреблять сообщения
  • Для каждого сообщения:

    • Если ошибок нет, зафиксируйте смещение
    • Если ошибка, повторите попытку столько раз, сколько пожелаете
    • В случае успеха зафиксируйте
    • Если вы хотите отказаться, войдите или опубликуйте в очередь ошибок (для анализа или последующего повторения)
  • Повторить

Комментарии:

1. если я это сделаю retry as many times you wish , я не смогу обрабатывать другие сообщения, и моя тема увеличится

2. Простая повторная попытка не очень полезна. Скорее всего, немедленные повторные попытки также завершатся неудачей (скажем, из-за недоступности внешнего API и т. Д.). Для повторных попыток вам также следует подумать о создании темы повторных попыток.

Ответ №2:

Используйте SeekToCurrentErrorHandler . Он переместит смещение для воспроизведения сообщения (по умолчанию 10 раз, но настраивается).

После того, как повторные попытки будут исчерпаны, он вызывает «средство восстановления», которое может предпринять некоторые действия, такие как DeadLetterPublishingRecoverer .

Комментарии:

1. спасибо @gary-russell Что, если я хочу реализовать какое-то поведение запроса? Например, мое приложение обрабатывает 20 сообщений одновременно, а затем я хочу запросить 1,5 и 7-е сообщения. Должен ли я явно публиковать сообщения в той же теме или я могу каким-то образом подтвердить эти конкретные сообщения?

2. DLPR можно настроить для публикации обратно в ту же очередь. Вы не должны обрабатывать записи одновременно на одном потребителе. Kafka поддерживает только смещение. Используйте разделы для достижения параллелизма.

Ответ №3:

Вы можете внести следующие изменения в конфигурацию вашего пользователя:

  1. enable.auto.commit=False

  2.  RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(retryInterval);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(retryMaxCount));
    
        return retryTemplate;
    }
      
  3. В вашем kafkaListenerContainerFactory :

     setretryTemplate(retryTemplate);
    factory.getContainerProperties().setAckOnError(true);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
      

Теперь в вашем коде просто генерируйте исключение в вашем потребителе всякий раз, когда возникает исключение. Это не обновит смещение при возникновении исключения. И он повторит попытку через retryInterval время максимум maxRetryCount .

И если вы хотите игнорировать и не повторять попытки для исключений определенного типа, создайте карту исключений, как показано ниже, и передайте ее, SimpleRetryPolicy() как в приведенном ниже коде:

 Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
        exceptionMap.put(IllegalArgumentException.class, false);
        exceptionMap.put(TimeoutException.class, true);
  

Для получения более подробной информации перейдите по этой ссылке: Обработка ошибок Kafka