Должен ли я использовать SeekToCurrentErrorHandler с механизмом повторной попытки API RestTemplate?

#spring-boot #spring-kafka

#пружинный ботинок #весна-кафка #весенняя загрузка

Вопрос:

Я пытаюсь написать потребительское приложение kafka на spring-kafka. Как потребитель, я должен убедиться, что я не пропустил ни одной записи, и все записи должны быть обработаны.

Мой дизайн приложения выглядит так :

 Topics --> Read records from topic --> dump it into a table A in oracle database --> pick records from a table A --> call rest api to update records in system table B --> update response of API in table a --> commit records 
  

Механизм повторной попытки на уровне API :

Теперь, если какая-либо из записей завершается ошибкой, это означает, что код ответа не соответствует желаемому (400,500 и т. Д.). Мы бы повторили эти записи 2 раза.

Механизм повторной попытки на уровне темы :

Но что, если я получил ошибку при фиксации смещений? значит, мне также нужен какой-то механизм повторной попытки на уровне темы. Я просмотрел документы и нашел вариант : SeekToCurrentErrorHandler

 @Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}
  

Теперь, насколько я понимаю, предположим, что если я не смогу зафиксировать какие-либо смещения, то после добавления приведенного выше кода это повторит попытку доставки до 2 раз (3 попытки доставки) с задержкой в 1 секунду. Итак, означает ли это, что весь мой поток будет воспроизведен дважды? если это так, то нужно ли мне отдельно добавлять механизм повторной попытки на уровне API?

Я просто пытаюсь понять, как я могу сделать свое потребительское приложение более устойчивым, чтобы я не пропускал ни одной записи при обработке и должен иметь механизм ошибок для обработки любых ошибок / пропущенных записей. Пожалуйста, предложите.

Ответ №1:

Лучше избегать ситуаций, когда смещения не могут быть зафиксированы (убедитесь max.poll.interval.ms , что этого достаточно).

Но, да, если фиксация смещений завершается неудачно (и commitSync является true), тогда запись будет повторно передана приложению. Если используется false commitSync , сбой будет просто зарегистрирован (или отправлен вашему слушателю), а смещение «next» для этого раздела будет зафиксировано позже (предположительно).

Добавление повторной попытки на уровне приложения (например, с использованием a RetryTemplate в адаптере слушателя — через фабрику контейнеров) по-прежнему будет страдать от той же проблемы; это также может привести к перебалансировке, если повторные попытки занимают слишком много времени.

Если вы действительно хотите избежать повторной обработки в этой ситуации, вам нужно сделать ваш код слушателя идемпотентным — например, сохранить тему / раздел / смещение где-нибудь, чтобы указать, что вы уже обработали эту запись.