Spring boot Kafka не работает — потребитель не получает сообщений

#java #spring-boot #apache-kafka

#java #spring-boot #apache-kafka

Вопрос:

Я пытаюсь запустить простое приложение Spring Boot Kafka, но я не могу заставить его работать. Я следовал различным руководствам, теперь я внедряю это, но когда я запускаю приложение, происходит вот что:

введите описание изображения здесь

Я могу писать в консоли, но потребитель не получает никакого сообщения.
Это мой класс SpringApplication:

 @SpringBootApplication(scanBasePackages = "com.springmiddleware")
@ComponentScan("com.springmiddleware")
@EnableAutoConfiguration
@EntityScan("com.springmiddleware")
public class SpringMiddlewareApplication implements CommandLineRunner{



    public static void main(String[] args) throws Exception {

        SpringApplication.run(SpringMiddlewareApplication.class, args);

    }

    @Autowired
    private Producer sender;

    @Override 
    public void run (String... strings) {
        sender.send("Hello world");
    }

}
  

application.yml:

 spring:
  kafka:
    bootstrap-servers: localhost:8080

app:
  topic:
    foo: foo.t

logging:
  level:
    root: ERROR
    org.springframework.web: ERROR
    com.memorynotfound: DEBUG
  

Класс потребителя, класс производителя и класс их конфигураций такие же, как написано в руководстве.
В моем файле server.properties у меня есть:

 zookeeper.connect=localhost:8080
  

и в zookeeper.properties:

 clientPort=8080
  

Тот же порт, что указан в application.yml. Перед запуском приложения я запускаю

 .binwindowszookeeper-server-start.bat configzookeeper.properties
  

и

 .binwindowskafka-server-start.bat configserver.properties
  

Обновить

Это ReceiverConfig класс:

 @EnableKafka
@Configuration
public class ReceiverConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}
  

This is the SenderConfig class:

     @Configuration
public class SenderConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  

And this is the method listen that is in the Consumer class

 @KafkaListener(topics = "${app.topic.foo}")
    public void listen(@Payload String message) {
        System.out.println("Received "   message);
    }
  

Producer class:

 @Service
public class Producer {

     @Autowired
     private KafkaTemplate<String, String> kafkaTemplate;

     @Value("${app.topic.foo}")
        private String topic;

     public void send(String message){
            kafkaTemplate.send(topic, message);
        }
}
  

UPDATE 2

[2019-04-01 17:23:52,492] ИНФОРМАЦИЯ Об установленном сеансе 0x100435950880000 с согласованным таймаутом 6000 для клиента /0:0:0:0:0:0:0:1:60079 ( org.apache.zookeeper.server.ZooKeeperServer) [2019-04-01 17:23:52,539] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid:0x100435950880000 тип: создать cxid: 0x1 zxid:0xef txntype:-1 reqpath: n / a Путь ошибки:/Ошибка потребителей:KeeperErrorCode = NodeExists для /потребителей (org.apache.zookeeper.server.prerequestprocessor) [2019-04-01 17:23:52,555] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid:0x100435950880000 тип: создать cxid: 0x2 zxid: 0xf0 txntype:-1 reqpath: n / a Путь ошибки:/brokers/ids Ошибка:KeeperErrorCode = NodeExists для /brokers/ids (org.apache.zookeeper.server.prerequestprocessor) [2019-04-01 17:23:52,555] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid:0x100435950880000 тип: создать cxid: 0x3 zxid: 0xf1 txntype:-1 reqpath: n / a Путь к ошибке:/brokers/topics Ошибка:KeeperErrorCode = NodeExists для /brokers/topics (org.apache.zookeeper.server.Prerequestprocessor) [2019-04-01 17:23:52,555] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип: создать cxid: 0x4 zxid: 0xf2 txntype:-1 reqpath: нет Пути к ошибке:/config/changes Ошибка:KeeperErrorCode = NodeExists для /config /changes (org.apache.zookeeper. сервер.Prerequestprocessor) [2019-04-01 17:23:52,570] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип: создать cxid: 0x5 zxid: 0xf3 txntype:-1 reqpath: нет Пути к ошибке:/admin/Ошибка delete_topics:KeeperErrorCode = NodeExists для /admin/ удаление_топики (org.apache.zookeeper.server.Prerequestprocessor) [2019-04-01 17:23:52,570] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип: создать cxid: 0x6 zxid: 0xf4 txntype:-1 reqpath: нет Пути к ошибке:/brokers/Ошибка seqid:KeeperErrorCode = NodeExists для /brokers/ seqid (org.apache. смотритель зоопарка.сервер.Prerequestprocessor) [2019-04-01 17:23:52,586] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип: создать cxid: 0x7 zxid: 0xf5 txntype:-1 reqpath: нет пути к ошибке:Ошибка /isr_change_notification: KeeperErrorCode = nodeexception для /isr_change_notification (орг.apache.zookeeper.сервер.Prerequestprocessor) [2019-04-01 17:23:52,586] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип: создать cxid: 0x8 zxid: 0xf6 txntype:-1 reqpath: нет Пути к ошибке:/latest_producer_id_block Ошибка:KeeperErrorCode = NodeExists для / latest_producer_id_block (org.apache.zookeeper.server.Prerequestprocessor) [2019-04-01 17:23:52,586] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип: создать cxid: 0x9 zxid: 0xf7 txntype:-1 reqpath: нет пути к ошибке:/log_dir_event_notification Ошибка:KeeperErrorCode = NodeExists для / log_dir_event_notification (org.apache.zookeeper.server.Prerequestprocessor) [2019-04-01 17:23:52,602] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип: создать cxid: 0xa zxid: 0xf8 txntype:-1 reqpath: нет Пути к ошибке:/config/topics Ошибка:KeeperErrorCode = NodeExists для /config/topics (org.apache.zookeeper.server.Prerequestprocessor) [2019-04-01 17:23:52,602] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип:create cxid: 0xb zxid: 0xf9 txntype:-1 reqpath: n / a Путь ошибки:/config/ Ошибка клиентов:KeeperErrorCode = NodeExists для /config /clients (org.apache.zookeeper.server .prerequestprocessor) [2019-04-01 17:23:52,617] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid:0x100435950880000 тип: создать cxid: 0xc zxid: 0xfa txntype:-1 reqpath: n / a Путь к ошибке:/config/ Ошибка пользователей:KeeperErrorCode = NodeExists для /config/users (org.apache.zookeeper.server.prerequestprocessor) [2019-04-01 17:23:52,617] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid:0x100435950880000 тип: создать cxid: 0xd zxid: 0xfb txntype:-1 reqpath: n / a Путь ошибки:/config/Ошибка брокеров:KeeperErrorCode = NodeExists для /config/brokers (org.apache.zookeeper.server.Prerequestprocessor) [2019-04-01 17:23:53,564] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип: multi cxid: 0x3a zxid: 0xff txntype:-1 reqpath: n / a прерывание оставшихся нескольких операций. Путь к ошибке:/admin/preferred_replica_election Ошибка:KeeperErrorCode = NoNode для /admin/preferred_replica_election (org.apache.zookeeper.server.Prerequestprocessor)

Комментарии:

1. покажите класс конфигурации потребителя и производителя configs с помощью метода прослушивателя

2. Обновлено классами

3. @Deadpool mate эта строка верна. KafkaTemplate используется в Producer методе класса.

4. Не могли бы вы добавить Producer класс?

5. @MadhuBhat добавил класс производителя, о котором идет речь

Ответ №1:

В вашем application.yml вы указали порт zookeeper вместо порта брокера kafka

 spring:
  kafka:
    bootstrap-servers: localhost:8080
  

В приведенном выше примере вы должны определить порт брокера kafka, то есть значение port= файла server.properties.

Приложение Spring boot по умолчанию запускается на порту 8080, поэтому, пожалуйста, не используйте его для порта Zookeeper, если вы не изменили порт по умолчанию приложения Spring boot.

Таким образом, в server.properties есть port=9092 и zookeeper.connect=localhost:2181 , а в application.yml есть, как показано ниже:

 spring:
  kafka:
    bootstrap-servers: localhost:9092
  

Затем в zookeeper.properties, есть clientPort=2181 . Затем перезапустите Zookeeper, сервер Kafka и приложение Spring boot в том же порядке.

Обновить:

Более новые версии Kafka используют listeners=PLAINTEXT://localhost:9092 вместо port=9092 в файле server.properties. Поэтому попробуйте заменить это.

Комментарии:

1. Привет, в файле server.properties у меня нет port =, должен ли я добавить его вручную? В какой позиции это должно быть?

2. @TodorokiM у server.properties должен быть port ключ, чтобы определить, на каком порту должен работать брокер kafka. Проверьте этот пример файла свойств github.com/bkimminich/apache-kafka-book-examples/blob/master /…

3. Добавьте port=9092 в server.properties , добавьте тот же порт в spring: kafka: bootstrap-servers: из application.yml , затем перезапустите zookeeper, сервер kafka и ваше приложение Spring boot.

4. @TodorokiM можете ли вы попробовать заменить port=9092 на listeners=PLAINTEXT://localhost:9092 в server.properties и перезапустить zookeeper, kafka и spring boot app?

5. @TodorokiM использовалась более старая версия Kafka port= , но более новая использует listeners= , вот в чем разница.