Пакетное подтверждение загрузки Springmessagelistener Разбиение сообщения на запятые

#java #spring #spring-boot #apache-kafka #spring-kafka

Вопрос:

У меня есть приложение для загрузки Spring с прослушивателем Кафки, реализующим интерфейс BatchAcknowledgingMessageListener<Строка, строка>. Когда я получаю то, что должно быть одним сообщением из темы, на самом деле это одно сообщение для каждой строки в исходном сообщении, и я не могу передать сообщение в запись пользователя<Строка, строка>.

Код, создающий запись, выглядит следующим образом:

 this.kafkaTemplate.send("myTopic", "12345", "{"OrderID": "12345"}, "OrderDate": "2021-06-01T12:13:16Z"");
 

И конфигурация Кафки выглядит так (она все еще находится на стадии интеграционного тестирования с использованием Testcontainers, поэтому производитель выпускает продукцию на ту же тему, которую слушает потребитель):

 spring:
  kafka:
    listener:
      ack-mode: manual-immediate
      concurrency: 1
    consumer:
      bootstrap-servers: localhost:9093
      enable-auto-commit: false
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 10
      topic: myTopic
    producer:
      bootstrap-servers: localhost:9093
      client-id: my-client
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      topic: myTopic
 

И, наконец, потребительская логика:

 @KafkaListener(topics = "${spring.kafka.consumer.topic}")
public void consumeMessages(final List<ConsumerRecord<String, String>> records, final Acknowledgment ack) throws IOException {
  // This line fails with ClassCastException: "Can't cast String to ConsumerRecord"
  // for (final ConsumerRecord<String, String> record : records) {

  for (final Object record : records) {
    log.debug("Record: {}", record);
  }
  ...
}
 

Вывод отладки из этого примера является:

 [LOG HEADER]: Record: {"OrderID": "12345"
[LOG HEADER]: Record: "OrderDate": "2021-06-01T12:13:16Z"}
 

Как вы можете видеть, сообщение разделяется запятыми, и я получаю несколько сообщений для одного сообщения, которое было создано. Это, очевидно, не удается, но я не могу понять, почему я просто не получаю один объект ConsumerRecord<Строка, строка>.

Ответ №1:

Вам не хватает конфигурации типа прослушивателя, поэтому служба преобразования по умолчанию видит, что вам нужен список, и разделяет строку запятыми.

 spring:
  kafka:
    listener:
      ack-mode: manual-immediate
      concurrency: 1
      type: batch
    consumer:
...
 

Добавление type: batch сообщает платформе, что вам нужен полный пакет записей.

Ответ №2:

Оказывается, проблема была довольно простой. Мне не хватало параарметра конфигурации spring.kafka.listener.type. Нашел ответ здесь: