#java #spring-boot #apache-kafka
Вопрос:
Я потратил четыре дня и не могу решить эту проблему. Потребитель не получает сообщений из тем кафки, размещенных там производителем. Ниже приведена конфигурация потребителя.
@EnableKafka @Configuration public class KafkaConfig { @Value("${fetch.max.bytes:1048576}") private String maxReceivedMessageSize; . . . . . . . . . . . . . . @Bean public ConsumerFactorylt;String, JsonNodegt; consumerFactory() { JsonDeserializerlt;JsonNodegt; jsonDeserializer = new JsonDeserializerlt;gt;(JsonNode.class); jsonDeserializer.addTrustedPackages(ClassUtils.getPackageName(ObjectNode.class)); Maplt;String, Objectgt; props = new HashMaplt;gt;(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, groupId); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, jsonDeserializer); props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxReceivedMessageSize); props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, maxReceivedMessageSize); return new DefaultKafkaConsumerFactorylt;gt;(props, new StringDeserializer(), jsonDeserializer); } @Bean public ConcurrentKafkaListenerContainerFactorylt;String, JsonNodegt; kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactorylt;String, JsonNodegt; factory = new ConcurrentKafkaListenerContainerFactorylt;gt;(); factory.setConsumerFactory(consumerFactory()); return factory; } }
Ниже приведено значение для fetch.max.байт, определенное в файле application.properties:
fetch.max.bytes=20048588
Ниже приведено определение класса KafkaConsumer:
@Service @RequiredArgsConstructor public class KafkaConsumer { private final TopicService topicService; @KafkaListener(topics = { "configurationMessages", "metaDevicesMessages", "accidentsMessages", "sessionsMessages", "ticketsMessages" }, groupId = groupId) public void consume(JsonNode jsonNode) { topicService.consume(jsonNode); } }
И ниже приведено определение метода topicService.consume(JsonNode) :
@Service @RequiredArgsConstructor public class TopicService { . . . . . . . . . . . . . . . public void consume(JsonNode incomingMessage) { String code = Utils.safeGetOrDefault(incomingMessage.get("code"), JsonNode::textValue, EMPTY); switch (code) { case "configurationMessage": consumeConfigurationMessage(incomingMessage); break; case "accidentChangedMessage": consumeAccidentChangedMessage(incomingMessage); break; case "allMetaDevicesMessage": consumeAllMetaDevicesMessage(incomingMessage); break; case "accidentsMessage": consumeAccidentsMessage(incomingMessage); break; case "allTicketsMessage": consumeAllTicketsMessage(incomingMessage); break; case "ticketChangedMessage": consumeTicketChangedMessage(incomingMessage); break; case "sessionMessage": consumeSessionMessage(incomingMessage); break; case "allSessionsMessage": consumeAllSessionsMessage(incomingMessage); break; case "sessionElementMessage": consumeSessionElementMessage(incomingMessage); break; case "allAccidentsMessage": consumeAllAccidentsMessage(incomingMessage); break; default: { String errorMessage = "Error in TopicService.consume() method: "Wrong message code:" " code; IncomingMessageProcessingError errorInfo = new IncomingMessageProcessingError(errorMessage, incomingMessage); JsonNode failed = new ObjectMapper().convertValue(errorInfo, JsonNode.class); kafkaTemplate.send("messageErrors", failed); } } } // Below are methods for handling messages received from kafka, as well as helper methods. . . . . . . . . . . . . . . . . . . . . . . . }
Where the variable code is the string that uniquely identifies the message read by the consumer from the topic. And EMPTY
org.apache.commons.lang3.StringUtils.EMPTY;
is the default for code.
TopicService-это потребительский сервис, который должен обрабатывать сообщения, полученные из тем кафки. Таким образом, в приложении запущены две основные службы: служба производителя, которая помещает сообщения в kafka, и вторая служба — TopicService, потребитель, который должен считывать сообщения из тем и обрабатывать их соответствующим образом. Почему потребитель TopicService либо вообще не принимает сообщения из тем, либо принимает их очень, очень редко, хотя производитель услуг постоянно отправляет сообщения в темы кафки через определенный промежуток времени (я отслеживаю это). Что я делаю не так? Ваша помощь будет очень признательна.