Проблема, возникающая при использовании или создании данных в теме на конкретном сервере VPC

#apache-kafka #kafka-consumer-api #spring-kafka

Вопрос:

Я пытаюсь сделать продюсера, используя кафку и весеннюю загрузку.

Я попытался создать новое приложение для создания сообщения по теме и для использования другим приложением. При запуске m тема сервера не распознается только изначально. Ошибка, которая возникает, показана ниже:

 2021-09-03 15:33:20.024 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=lims-public-helper] Error while fetching metadata with correlation id 9 : { sms.requests=UNKNOWN_TOPIC_OR_PARTITION} 
2021-09-03 15:33:20.026 WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=lims-public-helper] The following subscribed topics are not assigned to any members: [ sms.requests]
 

Я попробовал с другим сервером, он работает очень хорошо с той же конфигурацией, попробовал этот сервер, он дает исключение.

Темы Кафки

 $ kaf topics 
NAME PARTITIONS REPLICAS 
__consumer_offsets 50 3 
__trace 9 1 
sms.requests 3 1 
sms.status 1 3 
test 1 3 
 

Потребительский кодекс:

  public ConsumerFactory<String, OtpDTO> otpConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, limsGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonSerializer.TYPE_MAPPINGS, "otpDTO:com.lims.helper.dto.OtpDTO");
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.lims.helper.dto.OtpDTO");
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(OtpDTO.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OtpDTO> otpKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, OtpDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(otpConsumerFactory());
        return factory;
    }
 

сведения о списке потребителей:

   @KafkaListener(topics = "${spring.kafka.topic.lims.sms.otp}", containerFactory = "otpKafkaListenerContainerFactory")
    public void otpTopicMessage(@Payload OtpDTO otpDTO) {
        log.info(String.format("--------#####  otp topic consumer: %s", otpDTO));
    }
 

свойства подробная информация о теме:

 spring.kafka.topic.lims.sms.otp=sms.requests
spring.kafka.topic.lims.sms.status=sms.status
 

Комментарии:

1. 2021-09-03 15:33:20.024 ПРЕДУПРЕДИТЬ 1 — [ntainer#0-0-C-1] org.apache. кафка,клиенты. Сетевой клиент : [Идентификатор клиента потребителя=потребитель-1, идентификатор группы=lims-общедоступный помощник] Ошибка при извлечении метаданных с идентификатором корреляции 9 : { sms.запросы=НЕИЗВЕСТНЫЕ_ТОПИК_ОР_ЧАСТИ} 2021-09-03 15:33:20.026 ПРЕДУПРЕЖДЕНИЕ 1 — [владелец#0-0-C-1] внутренние устройства. Координатор потребителей : [Потребительский идентификатор клиента=потребитель-1, идентификатор группы=lims-общественный помощник] Следующие подписанные темы не назначаются ни одному участнику: [ sms.запросы]

2. но в случае, если тема отсутствует на сервере, она должна создать этот ритуал темы, потому что я включил его

3. Потребители никогда не создают темы, если они не существуют

4. хорошо, но в чем может быть причина этой ошибки? Как вы думаете, какая-то проблема с сетью или проблема с ssl?

5. Соединение, похоже, установлено нормально, так как вы не получаете ошибку TLS или тайм-аут сети.