#apache-kafka #transactions #kafka-consumer-api #kafka-transactions-api
Вопрос:
Я использую @trasactional в своем клиенте Кафки. В том же методе (помеченном @transactional) я выполняю некоторую транзакцию с БД. В случае сбоя транзакции БД мой потребитель пытается использовать сообщение 10 раз и после этого выдает исключение «Возврат исправленного возврата {интервал=0, currentAttempts=10, maxAttempts=9} исчерпан для сведений о пользователе».
@Transactional
@KafkaListener(topics = "xyz")
public void consume(final ConsumerDetails consumerDetail) {
System.out.println("received key: [" consumerDetail.key() "] and value: [" consumerDetail.value() "]"); System.out.println(
"consumer processing done");
throw new RuntimeException();
}
Есть ли способ, чтобы после устранения проблемы с БД я мог заставить своего потребителя использовать то же сообщение из темы Кафки? или это исключение «Откат исправлен» означает, что я потерял это сообщение?
Комментарии:
1. После пересчета потребителя вы можете использовать незафиксированные сообщения.
2. это то, что я думал, но потребитель не получал никаких сообщений после перезапуска. нужно ли мне устанавливать «автоматическое смещение-сброс=самое раннее»?