Получить последнее зафиксированное сообщение в разделе kafka

#apache-kafka #kafka-consumer-api

#apache-kafka #kafka-consumer-api

Вопрос:

Я использую семантику, предоставленную kafka, ровно один раз. Следовательно, мой продюсер записывает сообщение в транзакции. Пока мой продюсер отправлял 100-е сообщение, прямо между send() и commitTransaction() ; Я убил процесс producer.

Я прочитал последние несколько незафиксированных сообщений в моей теме.

 Consumer record offset - message number

0                     - 1
2                     - 2
196                   - 99  <- Last committed message by producer
198                   - 100 <- 100th message was sent but not committed
  

Теперь, когда я запускаю потребитель с read_committed уровнем изоляции. Он точно считывает 1-99 сообщений. Но для этого я прочитал всю тему. В конечном итоге я собираюсь хранить миллионы сообщений в теме. Итак, чтение всей темы не является предпочтительным.

Кроме того, предположим, что потребитель опрашивает сообщения от брокера и есть какая-то проблема со связью между загрузчиком kafka и потребителем. Последнее сообщение, прочитанное потребителем, имеет, скажем, смещение #50. Это означает, что я не смог надежно идентифицировать последнее зафиксированное сообщение в теме.

Я использовал другие методы, т.е.

 seekToEnd() - took me to offset#200
endOffsets() - took me to offset#200
  

Есть ли способ надежно получить сообщение, которое было зафиксировано производителем Kafka? (В моем случае, Offset#196 )

Комментарии:

1. Можете ли вы предоставить общий доступ к коду вашего потребителя? Кроме того, вы уверены, что последнее зафиксированное смещение равно 198?