#spring-batch #spring-kafka #spring-cloud-dataflow
#spring-batch #spring-kafka #spring-cloud-поток данных
Вопрос:
Я пытаюсь создать SCDF Task
для обработки ошибок, но не могу понять, как получить полное сообщение kafka с полезной нагрузкой и заголовками.
Идея состоит в том, чтобы направлять сообщения в DLQ в моих потоках, когда служба не отвечает. Например, какая-то служба HTTP не работает, и приложение httclient не работает.
Когда служба HTTP будет восстановлена, я хотел бы запустить задачу, которая принимает сообщения в DLQ и повторно отправляет их в соответствующую тему Kafka, независимо от того, что это за сообщение.
Я пытаюсь создать общую задачу, чтобы DLQ и целевая тема были свойствами потребителя и производителя Kafka. И я бы тоже хотел использовать generic org.springframework.messaging.Message
.
Когда я использую KafkaItemReader<String, String>
and KafkaItemWriter<String, String>
, и он отлично работает только с полезной нагрузкой в виде строки, но все заголовки теряются. Когда я использую KafkaItemReader<String, Message<?>>
и KafkaItemWriter<String, Message<?>>
для получения заголовков, у меня есть ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message
2020-11-13T14:27:03.472446462 01:00 stdout F java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message
2020-11-13T14:27:03.472450493 01:00 stdout F at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.47245463 01:00 stdout F at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472457814 01:00 stdout F at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472460712 01:00 stdout F at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472463956 01:00 stdout F at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472468765 01:00 stdout F at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
Есть ли способ сделать это?
Комментарии:
1. Что вы имеете в виду
all headers are lost
? Когда вы используетеMessage
полезную нагрузку as, настроилиMessage
ли вы сериализатор / десериализатор в свойствах чтения / записи элементов?
Ответ №1:
На самом деле кажется, что нет способа получить заголовки сообщений с KafkaItemReader
помощью and KafkaItemWriter
. Сериализатор / десериализатор используются для ключа и полезной нагрузки, но я не могу найти способ получить заголовки.
Я решил эту проблему, используя a Tasklet
вместо KafkaItemReader
and KafkaItemWriter
. В моем тасклете я использую KafkaConsumer
и KafkaProducer
для обработки ConsumerRecord
и ProducerRecord
которые позволяют мне копировать заголовки.
Более того, я могу более правильно обрабатывать фиксацию (без автоматической фиксации): смещения потребителя фиксируются только в том случае, если сообщения отправляются производителем.