Как @транзакционная работа в потребителе Кафки

#apache-kafka #transactions #kafka-consumer-api #kafka-transactions-api

Вопрос:

Я использую @trasactional в своем клиенте Кафки. В том же методе (помеченном @transactional) я выполняю некоторую транзакцию с БД. В случае сбоя транзакции БД мой потребитель пытается использовать сообщение 10 раз и после этого выдает исключение «Возврат исправленного возврата {интервал=0, currentAttempts=10, maxAttempts=9} исчерпан для сведений о пользователе».

  @Transactional
      @KafkaListener(topics = "xyz") 
      public void  consume(final ConsumerDetails consumerDetail) {
     System.out.println("received key: ["   consumerDetail.key()   "] and value: ["   consumerDetail.value()   "]"); System.out.println(
     "consumer processing done");
     throw new RuntimeException();
      }
 

Есть ли способ, чтобы после устранения проблемы с БД я мог заставить своего потребителя использовать то же сообщение из темы Кафки? или это исключение «Откат исправлен» означает, что я потерял это сообщение?

Комментарии:

1. После пересчета потребителя вы можете использовать незафиксированные сообщения.

2. это то, что я думал, но потребитель не получал никаких сообщений после перезапуска. нужно ли мне устанавливать «автоматическое смещение-сброс=самое раннее»?