#apache-kafka #kafka-consumer-api
#apache-kafka #kafka-consumer-api
Вопрос:
Я использую семантику, предоставленную kafka, ровно один раз. Следовательно, мой продюсер записывает сообщение в транзакции. Пока мой продюсер отправлял 100-е сообщение, прямо между send()
и commitTransaction()
; Я убил процесс producer.
Я прочитал последние несколько незафиксированных сообщений в моей теме.
Consumer record offset - message number
0 - 1
2 - 2
196 - 99 <- Last committed message by producer
198 - 100 <- 100th message was sent but not committed
Теперь, когда я запускаю потребитель с read_committed
уровнем изоляции. Он точно считывает 1-99 сообщений. Но для этого я прочитал всю тему. В конечном итоге я собираюсь хранить миллионы сообщений в теме. Итак, чтение всей темы не является предпочтительным.
Кроме того, предположим, что потребитель опрашивает сообщения от брокера и есть какая-то проблема со связью между загрузчиком kafka и потребителем. Последнее сообщение, прочитанное потребителем, имеет, скажем, смещение #50. Это означает, что я не смог надежно идентифицировать последнее зафиксированное сообщение в теме.
Я использовал другие методы, т.е.
seekToEnd() - took me to offset#200
endOffsets() - took me to offset#200
Есть ли способ надежно получить сообщение, которое было зафиксировано производителем Kafka? (В моем случае, Offset#196
)
Комментарии:
1. Можете ли вы предоставить общий доступ к коду вашего потребителя? Кроме того, вы уверены, что последнее зафиксированное смещение равно 198?