Потребитель Kafka не получает сообщение при загрузке Spring

#spring #spring-boot #apache-kafka #kafka-consumer-api

#весна #весенняя загрузка #apache-kafka #кафка-потребитель-api

Вопрос:

Мой потребитель spring / java не может получить доступ к сообщению, созданному производителем. Однако, когда я запускаю потребителя из консоли / терминала, он может получить сообщение, созданное spring / java producer.

Конфигурация потребителя :

 @Component
@ConfigurationProperties(prefix="kafka.consumer")
public class KafkaConsumerProperties {

    private String bootstrap;
    private String group;
    private String topic;

    public String getBootstrap() {
        return bootstrap;
    }

    public void setBootstrap(String bootstrap) {
        this.bootstrap = bootstrap;
    }

    public String getGroup() {
        return group;
    }

    public void setGroup(String group) {
        this.group = group;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }
}
  

Конфигурация прослушивателя :

 @Configuration
@EnableKafka
public class KafkaListenerConfig {

    @Autowired
    private KafkaConsumerProperties kafkaConsumerProperties;

    @Bean
    public Map<String, Object> getConsumerProperties() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrap());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroup());
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        return properties;
    }

    @Bean
    public Deserializer stringKeyDeserializer() {
        return new StringDeserializer();
    }

    @Bean
    public Deserializer transactionJsonValueDeserializer() {
        return new JsonDeserializer(Transaction.class);
    }

    @Bean
    public ConsumerFactory<String, Transaction> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(getConsumerProperties(), stringKeyDeserializer(), transactionJsonValueDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Transaction> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Transaction> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(1);
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}
  

Слушатель Кафки :

 @Service
public class TransactionConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(Transaction.class);

    @KafkaListener(topics={"transactions"}, containerFactory = "kafkaListenerContainerFactory")
    public void onReceive(Transaction transaction) {
        LOGGER.info("transaction = {}",transaction);
    }
}
  

Потребительское приложение :

 @SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

}

  

ТЕСТОВЫЙ ПРИМЕР 1: ПРОЙТИ
Я запустил свой spring / java producer и запустил потребителя с консоли. Когда я создаю форму сообщения, мой консольный потребитель может получить доступ к сообщению.

ТЕСТОВЫЙ ПРИМЕР 2: СБОЙ Я запустил своего потребителя spring / java и запустил производителя с консоли. Когда я создаю консольный производитель формы сообщения, мой потребитель spring / java не может получить доступ к сообщению.

ТЕСТОВЫЙ ПРИМЕР 3: СБОЙ Я запустил своего потребителя spring / java и запустил spring / java producer. Когда я создаю форму сообщения spring / java producer, мой потребитель spring / java не может получить доступ к сообщению.

Вопрос

  1. Что-то не так в моем потребительском коде?

  2. Мне не хватает какой-либо конфигурации для моего kafka-слушателя?

  3. Нужно ли мне явно запускать прослушиватель? (Я так не думаю, поскольку я вижу в журнале терминала подключение к теме, но все же я не уверен)

Комментарии:

1. Может ли это быть связано с тем, как вы его тестируете? Вы нашли решение?

Ответ №1:

Хорошо AUTO_OFFSET_RESET_CONFIG , вы пропускаете Consumer Configs

 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  

auto.offset.reset

Что делать, если в Kafka нет начального смещения или если текущее смещение больше не существует на сервере (например, потому что эти данные были удалены):

самый ранний: автоматический сброс смещения до самого раннего смещения

последняя версия: автоматический сброс смещения до последнего смещения

нет: выбросить исключение для потребителя, если предыдущее смещение не найдено для группы потребителя

что-нибудь еще: выбросить исключение для потребителя

Примечание: auto.offset.reset to earliest будет работать, только если у kafka нет смещения для этой группы потребителей (поэтому в вашем случае необходимо добавить это свойство в новую группу потребителей и перезапустить приложение)

Комментарии:

1. Спасибо за ответ. Я попробовал предложенное вами решение, однако оно не сработало. Я пробовал использовать все варианты, но безуспешно. Знаете ли вы, как установить параметр ‘—from-beginning’ (который мы используем в консоли) в коде Java?

2. Do you know how to set '--from-beginning' option (that we use in console) in Java code ? это в моем ответе @Developer

3. У меня точно такой же сценарий проблемы. Мне тоже не повезло с этим решением. Вы нашли какие-либо возможные обходные пути?

4. Попробуйте spring.kafka.consumer.auto-offset-reset=earlisest (см. Документы )