#java #spring-boot #apache-kafka
#java #spring-boot #apache-kafka
Вопрос:
Я пытаюсь запустить простое приложение Spring Boot Kafka, но я не могу заставить его работать. Я следовал различным руководствам, теперь я внедряю это, но когда я запускаю приложение, происходит вот что:
Я могу писать в консоли, но потребитель не получает никакого сообщения.
Это мой класс SpringApplication:
@SpringBootApplication(scanBasePackages = "com.springmiddleware")
@ComponentScan("com.springmiddleware")
@EnableAutoConfiguration
@EntityScan("com.springmiddleware")
public class SpringMiddlewareApplication implements CommandLineRunner{
public static void main(String[] args) throws Exception {
SpringApplication.run(SpringMiddlewareApplication.class, args);
}
@Autowired
private Producer sender;
@Override
public void run (String... strings) {
sender.send("Hello world");
}
}
application.yml:
spring:
kafka:
bootstrap-servers: localhost:8080
app:
topic:
foo: foo.t
logging:
level:
root: ERROR
org.springframework.web: ERROR
com.memorynotfound: DEBUG
Класс потребителя, класс производителя и класс их конфигураций такие же, как написано в руководстве.
В моем файле server.properties у меня есть:
zookeeper.connect=localhost:8080
и в zookeeper.properties:
clientPort=8080
Тот же порт, что указан в application.yml. Перед запуском приложения я запускаю
.binwindowszookeeper-server-start.bat configzookeeper.properties
и
.binwindowskafka-server-start.bat configserver.properties
Обновить
Это ReceiverConfig
класс:
@EnableKafka
@Configuration
public class ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
This is the SenderConfig class:
@Configuration
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
And this is the method listen
that is in the Consumer class
@KafkaListener(topics = "${app.topic.foo}")
public void listen(@Payload String message) {
System.out.println("Received " message);
}
Producer class:
@Service
public class Producer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.topic.foo}")
private String topic;
public void send(String message){
kafkaTemplate.send(topic, message);
}
}
UPDATE 2
[2019-04-01 17:23:52,492] ИНФОРМАЦИЯ Об установленном сеансе 0x100435950880000 с согласованным таймаутом 6000 для клиента /0:0:0:0:0:0:0:1:60079 ( org.apache.zookeeper.server.ZooKeeperServer) [2019-04-01 17:23:52,539] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid:0x100435950880000 тип: создать cxid: 0x1 zxid:0xef txntype:-1 reqpath: n / a Путь ошибки:/Ошибка потребителей:KeeperErrorCode = NodeExists для /потребителей (org.apache.zookeeper.server.prerequestprocessor) [2019-04-01 17:23:52,555] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid:0x100435950880000 тип: создать cxid: 0x2 zxid: 0xf0 txntype:-1 reqpath: n / a Путь ошибки:/brokers/ids Ошибка:KeeperErrorCode = NodeExists для /brokers/ids (org.apache.zookeeper.server.prerequestprocessor) [2019-04-01 17:23:52,555] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid:0x100435950880000 тип: создать cxid: 0x3 zxid: 0xf1 txntype:-1 reqpath: n / a Путь к ошибке:/brokers/topics Ошибка:KeeperErrorCode = NodeExists для /brokers/topics (org.apache.zookeeper.server.Prerequestprocessor) [2019-04-01 17:23:52,555] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип: создать cxid: 0x4 zxid: 0xf2 txntype:-1 reqpath: нет Пути к ошибке:/config/changes Ошибка:KeeperErrorCode = NodeExists для /config /changes (org.apache.zookeeper. сервер.Prerequestprocessor) [2019-04-01 17:23:52,570] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип: создать cxid: 0x5 zxid: 0xf3 txntype:-1 reqpath: нет Пути к ошибке:/admin/Ошибка delete_topics:KeeperErrorCode = NodeExists для /admin/ удаление_топики (org.apache.zookeeper.server.Prerequestprocessor) [2019-04-01 17:23:52,570] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип: создать cxid: 0x6 zxid: 0xf4 txntype:-1 reqpath: нет Пути к ошибке:/brokers/Ошибка seqid:KeeperErrorCode = NodeExists для /brokers/ seqid (org.apache. смотритель зоопарка.сервер.Prerequestprocessor) [2019-04-01 17:23:52,586] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип: создать cxid: 0x7 zxid: 0xf5 txntype:-1 reqpath: нет пути к ошибке:Ошибка /isr_change_notification: KeeperErrorCode = nodeexception для /isr_change_notification (орг.apache.zookeeper.сервер.Prerequestprocessor) [2019-04-01 17:23:52,586] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип: создать cxid: 0x8 zxid: 0xf6 txntype:-1 reqpath: нет Пути к ошибке:/latest_producer_id_block Ошибка:KeeperErrorCode = NodeExists для / latest_producer_id_block (org.apache.zookeeper.server.Prerequestprocessor) [2019-04-01 17:23:52,586] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип: создать cxid: 0x9 zxid: 0xf7 txntype:-1 reqpath: нет пути к ошибке:/log_dir_event_notification Ошибка:KeeperErrorCode = NodeExists для / log_dir_event_notification (org.apache.zookeeper.server.Prerequestprocessor) [2019-04-01 17:23:52,602] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип: создать cxid: 0xa zxid: 0xf8 txntype:-1 reqpath: нет Пути к ошибке:/config/topics Ошибка:KeeperErrorCode = NodeExists для /config/topics (org.apache.zookeeper.server.Prerequestprocessor) [2019-04-01 17:23:52,602] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип:create cxid: 0xb zxid: 0xf9 txntype:-1 reqpath: n / a Путь ошибки:/config/ Ошибка клиентов:KeeperErrorCode = NodeExists для /config /clients (org.apache.zookeeper.server .prerequestprocessor) [2019-04-01 17:23:52,617] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid:0x100435950880000 тип: создать cxid: 0xc zxid: 0xfa txntype:-1 reqpath: n / a Путь к ошибке:/config/ Ошибка пользователей:KeeperErrorCode = NodeExists для /config/users (org.apache.zookeeper.server.prerequestprocessor) [2019-04-01 17:23:52,617] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid:0x100435950880000 тип: создать cxid: 0xd zxid: 0xfb txntype:-1 reqpath: n / a Путь ошибки:/config/Ошибка брокеров:KeeperErrorCode = NodeExists для /config/brokers (org.apache.zookeeper.server.Prerequestprocessor) [2019-04-01 17:23:53,564] ИНФОРМАЦИЯ Получено исключение KeeperException на уровне пользователя при обработке sessionid: 0x100435950880000 тип: multi cxid: 0x3a zxid: 0xff txntype:-1 reqpath: n / a прерывание оставшихся нескольких операций. Путь к ошибке:/admin/preferred_replica_election Ошибка:KeeperErrorCode = NoNode для /admin/preferred_replica_election (org.apache.zookeeper.server.Prerequestprocessor)
Комментарии:
1. покажите класс конфигурации потребителя и производителя configs с помощью метода прослушивателя
2. Обновлено классами
3. @Deadpool mate эта строка верна. KafkaTemplate используется в
Producer
методе класса.4. Не могли бы вы добавить
Producer
класс?5. @MadhuBhat добавил класс производителя, о котором идет речь
Ответ №1:
В вашем application.yml вы указали порт zookeeper вместо порта брокера kafka
spring:
kafka:
bootstrap-servers: localhost:8080
В приведенном выше примере вы должны определить порт брокера kafka, то есть значение port=
файла server.properties.
Приложение Spring boot по умолчанию запускается на порту 8080, поэтому, пожалуйста, не используйте его для порта Zookeeper, если вы не изменили порт по умолчанию приложения Spring boot.
Таким образом, в server.properties есть port=9092
и zookeeper.connect=localhost:2181
, а в application.yml есть, как показано ниже:
spring:
kafka:
bootstrap-servers: localhost:9092
Затем в zookeeper.properties, есть clientPort=2181
. Затем перезапустите Zookeeper, сервер Kafka и приложение Spring boot в том же порядке.
Обновить:
Более новые версии Kafka используют listeners=PLAINTEXT://localhost:9092
вместо port=9092
в файле server.properties. Поэтому попробуйте заменить это.
Комментарии:
1. Привет, в файле server.properties у меня нет port =, должен ли я добавить его вручную? В какой позиции это должно быть?
2. @TodorokiM у
server.properties
должен бытьport
ключ, чтобы определить, на каком порту должен работать брокер kafka. Проверьте этот пример файла свойств github.com/bkimminich/apache-kafka-book-examples/blob/master /…3. Добавьте
port=9092
вserver.properties
, добавьте тот же порт вspring: kafka: bootstrap-servers:
изapplication.yml
, затем перезапустите zookeeper, сервер kafka и ваше приложение Spring boot.4. @TodorokiM можете ли вы попробовать заменить
port=9092
наlisteners=PLAINTEXT://localhost:9092
вserver.properties
и перезапустить zookeeper, kafka и spring boot app?5. @TodorokiM использовалась более старая версия Kafka
port=
, но более новая используетlisteners=
, вот в чем разница.