Как получить полное сообщение с помощью KafkaItemReader в задаче SCDF?

#spring-batch #spring-kafka #spring-cloud-dataflow

#spring-batch #spring-kafka #spring-cloud-поток данных

Вопрос:

Я пытаюсь создать SCDF Task для обработки ошибок, но не могу понять, как получить полное сообщение kafka с полезной нагрузкой и заголовками.

Идея состоит в том, чтобы направлять сообщения в DLQ в моих потоках, когда служба не отвечает. Например, какая-то служба HTTP не работает, и приложение httclient не работает.

Когда служба HTTP будет восстановлена, я хотел бы запустить задачу, которая принимает сообщения в DLQ и повторно отправляет их в соответствующую тему Kafka, независимо от того, что это за сообщение.

Я пытаюсь создать общую задачу, чтобы DLQ и целевая тема были свойствами потребителя и производителя Kafka. И я бы тоже хотел использовать generic org.springframework.messaging.Message .

Когда я использую KafkaItemReader<String, String> and KafkaItemWriter<String, String> , и он отлично работает только с полезной нагрузкой в виде строки, но все заголовки теряются. Когда я использую KafkaItemReader<String, Message<?>> и KafkaItemWriter<String, Message<?>> для получения заголовков, у меня есть ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message

 2020-11-13T14:27:03.472446462 01:00 stdout F java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message
2020-11-13T14:27:03.472450493 01:00 stdout F    at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.47245463 01:00 stdout F     at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472457814 01:00 stdout F    at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472460712 01:00 stdout F    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472463956 01:00 stdout F    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472468765 01:00 stdout F    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
  

Есть ли способ сделать это?

Комментарии:

1. Что вы имеете в виду all headers are lost ? Когда вы используете Message полезную нагрузку as, настроили Message ли вы сериализатор / десериализатор в свойствах чтения / записи элементов?

Ответ №1:

На самом деле кажется, что нет способа получить заголовки сообщений с KafkaItemReader помощью and KafkaItemWriter . Сериализатор / десериализатор используются для ключа и полезной нагрузки, но я не могу найти способ получить заголовки.

Я решил эту проблему, используя a Tasklet вместо KafkaItemReader and KafkaItemWriter . В моем тасклете я использую KafkaConsumer и KafkaProducer для обработки ConsumerRecord и ProducerRecord которые позволяют мне копировать заголовки.

Более того, я могу более правильно обрабатывать фиксацию (без автоматической фиксации): смещения потребителя фиксируются только в том случае, если сообщения отправляются производителем.