#java #spring #spring-boot #apache-kafka #spring-kafka
Вопрос:
У меня есть приложение для загрузки Spring с прослушивателем Кафки, реализующим интерфейс BatchAcknowledgingMessageListener<Строка, строка>. Когда я получаю то, что должно быть одним сообщением из темы, на самом деле это одно сообщение для каждой строки в исходном сообщении, и я не могу передать сообщение в запись пользователя<Строка, строка>.
Код, создающий запись, выглядит следующим образом:
this.kafkaTemplate.send("myTopic", "12345", "{"OrderID": "12345"}, "OrderDate": "2021-06-01T12:13:16Z"");
И конфигурация Кафки выглядит так (она все еще находится на стадии интеграционного тестирования с использованием Testcontainers, поэтому производитель выпускает продукцию на ту же тему, которую слушает потребитель):
spring:
kafka:
listener:
ack-mode: manual-immediate
concurrency: 1
consumer:
bootstrap-servers: localhost:9093
enable-auto-commit: false
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 10
topic: myTopic
producer:
bootstrap-servers: localhost:9093
client-id: my-client
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
topic: myTopic
И, наконец, потребительская логика:
@KafkaListener(topics = "${spring.kafka.consumer.topic}")
public void consumeMessages(final List<ConsumerRecord<String, String>> records, final Acknowledgment ack) throws IOException {
// This line fails with ClassCastException: "Can't cast String to ConsumerRecord"
// for (final ConsumerRecord<String, String> record : records) {
for (final Object record : records) {
log.debug("Record: {}", record);
}
...
}
Вывод отладки из этого примера является:
[LOG HEADER]: Record: {"OrderID": "12345"
[LOG HEADER]: Record: "OrderDate": "2021-06-01T12:13:16Z"}
Как вы можете видеть, сообщение разделяется запятыми, и я получаю несколько сообщений для одного сообщения, которое было создано. Это, очевидно, не удается, но я не могу понять, почему я просто не получаю один объект ConsumerRecord<Строка, строка>.
Ответ №1:
Вам не хватает конфигурации типа прослушивателя, поэтому служба преобразования по умолчанию видит, что вам нужен список, и разделяет строку запятыми.
spring:
kafka:
listener:
ack-mode: manual-immediate
concurrency: 1
type: batch
consumer:
...
Добавление type: batch
сообщает платформе, что вам нужен полный пакет записей.
Ответ №2:
Оказывается, проблема была довольно простой. Мне не хватало параарметра конфигурации spring.kafka.listener.type. Нашел ответ здесь: