#spring-boot #spring-kafka
#spring-boot #spring-kafka
Вопрос:
Как я могу начинать чтение темы с начала при каждом запуске моего приложения, если у меня есть аннотация @KafkaListener ?
Ответ №1:
Один из способов — использовать анонимное управление группами: нет id()
и нет groupId()
для этого @KafkaListener
. Если idIsGroup() == true
(по умолчанию), конечно.
И вам также нужно использовать @TopicPartition
подобное объяснение здесь: https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#manual-assignment
Начиная с версии 2.5.5, вы можете применить начальное смещение ко всем назначенным разделам:
topicPartitions = { @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
Другой способ — реализовать an AbstractConsumerSeekAware
для вашего @KafkaListener
класса и вызвать его, ConsumerSeekCallback.seekToBeginning()
когда вы получите разделы, назначенные в onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback)
.