#apache-kafka #kafka-consumer-api #spring-kafka
Вопрос:
Я пытаюсь сделать продюсера, используя кафку и весеннюю загрузку.
Я попытался создать новое приложение для создания сообщения по теме и для использования другим приложением. При запуске m тема сервера не распознается только изначально. Ошибка, которая возникает, показана ниже:
2021-09-03 15:33:20.024 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=lims-public-helper] Error while fetching metadata with correlation id 9 : { sms.requests=UNKNOWN_TOPIC_OR_PARTITION}
2021-09-03 15:33:20.026 WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=lims-public-helper] The following subscribed topics are not assigned to any members: [ sms.requests]
Я попробовал с другим сервером, он работает очень хорошо с той же конфигурацией, попробовал этот сервер, он дает исключение.
Темы Кафки
$ kaf topics
NAME PARTITIONS REPLICAS
__consumer_offsets 50 3
__trace 9 1
sms.requests 3 1
sms.status 1 3
test 1 3
Потребительский кодекс:
public ConsumerFactory<String, OtpDTO> otpConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, limsGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonSerializer.TYPE_MAPPINGS, "otpDTO:com.lims.helper.dto.OtpDTO");
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.lims.helper.dto.OtpDTO");
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(OtpDTO.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OtpDTO> otpKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OtpDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(otpConsumerFactory());
return factory;
}
сведения о списке потребителей:
@KafkaListener(topics = "${spring.kafka.topic.lims.sms.otp}", containerFactory = "otpKafkaListenerContainerFactory")
public void otpTopicMessage(@Payload OtpDTO otpDTO) {
log.info(String.format("--------##### otp topic consumer: %s", otpDTO));
}
свойства подробная информация о теме:
spring.kafka.topic.lims.sms.otp=sms.requests
spring.kafka.topic.lims.sms.status=sms.status
Комментарии:
1. 2021-09-03 15:33:20.024 ПРЕДУПРЕДИТЬ 1 — [ntainer#0-0-C-1] org.apache. кафка,клиенты. Сетевой клиент : [Идентификатор клиента потребителя=потребитель-1, идентификатор группы=lims-общедоступный помощник] Ошибка при извлечении метаданных с идентификатором корреляции 9 : { sms.запросы=НЕИЗВЕСТНЫЕ_ТОПИК_ОР_ЧАСТИ} 2021-09-03 15:33:20.026 ПРЕДУПРЕЖДЕНИЕ 1 — [владелец#0-0-C-1] внутренние устройства. Координатор потребителей : [Потребительский идентификатор клиента=потребитель-1, идентификатор группы=lims-общественный помощник] Следующие подписанные темы не назначаются ни одному участнику: [ sms.запросы]
2. но в случае, если тема отсутствует на сервере, она должна создать этот ритуал темы, потому что я включил его
3. Потребители никогда не создают темы, если они не существуют
4. хорошо, но в чем может быть причина этой ошибки? Как вы думаете, какая-то проблема с сетью или проблема с ssl?
5. Соединение, похоже, установлено нормально, так как вы не получаете ошибку TLS или тайм-аут сети.