Почему потребитель Apache Kafka не использует сообщения из тем?

#java #spring-boot #apache-kafka

Вопрос:

Я потратил четыре дня и не могу решить эту проблему. Потребитель не получает сообщений из тем кафки, размещенных там производителем. Ниже приведена конфигурация потребителя.

 @EnableKafka @Configuration public class KafkaConfig {   @Value("${fetch.max.bytes:1048576}")  private String maxReceivedMessageSize;   . . . . . . . . . . . . . .  @Bean  public ConsumerFactorylt;String, JsonNodegt; consumerFactory() {  JsonDeserializerlt;JsonNodegt; jsonDeserializer = new JsonDeserializerlt;gt;(JsonNode.class);  jsonDeserializer.addTrustedPackages(ClassUtils.getPackageName(ObjectNode.class));   Maplt;String, Objectgt; props = new HashMaplt;gt;();  props.put(ConsumerConfig.CLIENT_ID_CONFIG, groupId);  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, jsonDeserializer);  props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxReceivedMessageSize);  props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, maxReceivedMessageSize);  return new DefaultKafkaConsumerFactorylt;gt;(props, new StringDeserializer(), jsonDeserializer);  }    @Bean  public ConcurrentKafkaListenerContainerFactorylt;String, JsonNodegt; kafkaListenerContainerFactory() {  ConcurrentKafkaListenerContainerFactorylt;String, JsonNodegt; factory = new ConcurrentKafkaListenerContainerFactorylt;gt;();  factory.setConsumerFactory(consumerFactory());  return factory;  } }  

Ниже приведено значение для fetch.max.байт, определенное в файле application.properties:

 fetch.max.bytes=20048588  

Ниже приведено определение класса KafkaConsumer:

 @Service @RequiredArgsConstructor public class KafkaConsumer {  private final TopicService topicService;   @KafkaListener(topics = {  "configurationMessages",  "metaDevicesMessages",  "accidentsMessages",  "sessionsMessages",  "ticketsMessages"  }, groupId = groupId)  public void consume(JsonNode jsonNode) {  topicService.consume(jsonNode);  } }  

И ниже приведено определение метода topicService.consume(JsonNode) :

 @Service @RequiredArgsConstructor public class TopicService { . . . . . . . . . . . . . . .  public void consume(JsonNode incomingMessage) {  String code = Utils.safeGetOrDefault(incomingMessage.get("code"), JsonNode::textValue, EMPTY);  switch (code) {  case "configurationMessage":  consumeConfigurationMessage(incomingMessage);  break;  case "accidentChangedMessage":  consumeAccidentChangedMessage(incomingMessage);  break;  case "allMetaDevicesMessage":  consumeAllMetaDevicesMessage(incomingMessage);  break;  case "accidentsMessage":  consumeAccidentsMessage(incomingMessage);  break;  case "allTicketsMessage":  consumeAllTicketsMessage(incomingMessage);  break;  case "ticketChangedMessage":  consumeTicketChangedMessage(incomingMessage);  break;  case "sessionMessage":  consumeSessionMessage(incomingMessage);  break;  case "allSessionsMessage":  consumeAllSessionsMessage(incomingMessage);  break;  case "sessionElementMessage":  consumeSessionElementMessage(incomingMessage);  break;  case "allAccidentsMessage":  consumeAllAccidentsMessage(incomingMessage);  break;  default: {  String errorMessage = "Error in TopicService.consume() method: "Wrong message code:" "   code;  IncomingMessageProcessingError errorInfo = new IncomingMessageProcessingError(errorMessage, incomingMessage);  JsonNode failed = new ObjectMapper().convertValue(errorInfo, JsonNode.class);  kafkaTemplate.send("messageErrors", failed);  }  }  }   // Below are methods for handling messages received from kafka, as well as helper methods.  . . . . . . . . . . . . . . . . . . . . . . . }  

Where the variable code is the string that uniquely identifies the message read by the consumer from the topic. And EMPTY

 org.apache.commons.lang3.StringUtils.EMPTY;  

is the default for code.

TopicService-это потребительский сервис, который должен обрабатывать сообщения, полученные из тем кафки. Таким образом, в приложении запущены две основные службы: служба производителя, которая помещает сообщения в kafka, и вторая служба — TopicService, потребитель, который должен считывать сообщения из тем и обрабатывать их соответствующим образом. Почему потребитель TopicService либо вообще не принимает сообщения из тем, либо принимает их очень, очень редко, хотя производитель услуг постоянно отправляет сообщения в темы кафки через определенный промежуток времени (я отслеживаю это). Что я делаю не так? Ваша помощь будет очень признательна.