#spring-boot #spring-kafka
#пружинный ботинок #весна-кафка #весенняя загрузка
Вопрос:
Я пытаюсь написать потребительское приложение kafka на spring-kafka. Как потребитель, я должен убедиться, что я не пропустил ни одной записи, и все записи должны быть обработаны.
Мой дизайн приложения выглядит так :
Topics --> Read records from topic --> dump it into a table A in oracle database --> pick records from a table A --> call rest api to update records in system table B --> update response of API in table a --> commit records
Механизм повторной попытки на уровне API :
Теперь, если какая-либо из записей завершается ошибкой, это означает, что код ответа не соответствует желаемому (400,500 и т. Д.). Мы бы повторили эти записи 2 раза.
Механизм повторной попытки на уровне темы :
Но что, если я получил ошибку при фиксации смещений? значит, мне также нужен какой-то механизм повторной попытки на уровне темы. Я просмотрел документы и нашел вариант : SeekToCurrentErrorHandler
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
Теперь, насколько я понимаю, предположим, что если я не смогу зафиксировать какие-либо смещения, то после добавления приведенного выше кода это повторит попытку доставки до 2 раз (3 попытки доставки) с задержкой в 1 секунду. Итак, означает ли это, что весь мой поток будет воспроизведен дважды? если это так, то нужно ли мне отдельно добавлять механизм повторной попытки на уровне API?
Я просто пытаюсь понять, как я могу сделать свое потребительское приложение более устойчивым, чтобы я не пропускал ни одной записи при обработке и должен иметь механизм ошибок для обработки любых ошибок / пропущенных записей. Пожалуйста, предложите.
Ответ №1:
Лучше избегать ситуаций, когда смещения не могут быть зафиксированы (убедитесь max.poll.interval.ms
, что этого достаточно).
Но, да, если фиксация смещений завершается неудачно (и commitSync
является true), тогда запись будет повторно передана приложению. Если используется false
commitSync , сбой будет просто зарегистрирован (или отправлен вашему слушателю), а смещение «next» для этого раздела будет зафиксировано позже (предположительно).
Добавление повторной попытки на уровне приложения (например, с использованием a RetryTemplate
в адаптере слушателя — через фабрику контейнеров) по-прежнему будет страдать от той же проблемы; это также может привести к перебалансировке, если повторные попытки занимают слишком много времени.
Если вы действительно хотите избежать повторной обработки в этой ситуации, вам нужно сделать ваш код слушателя идемпотентным — например, сохранить тему / раздел / смещение где-нибудь, чтобы указать, что вы уже обработали эту запись.