#header #spring-batch #spring-kafka #spring-cloud-stream
#заголовок #пружинная партия #весна-кафка #весна-облако-поток
Вопрос:
В приложении spring-cloud-stream Kafka, когда я потребляю сообщения в обычном режиме, я правильно получаю сообщение с его полезной нагрузкой и пользовательскими заголовками. тем не менее, когда я устанавливаю для «headerMode» потребителя значение true, а для типа ввода Функции Listlt;?gt; (в соответствии с документацией), я получаю список полезных нагрузок. Как я могу получить список сообщений, каждое из которых содержит заголовок и полезную нагрузку?
уточнение — событие при настройке типа коллекции (например Listlt;Messagelt;MyTypegt;gt;
, она всегда возвращает только полезную нагрузку ( Listlt;MyTypegt;
) во время выполнения.
приложение.yaml:
spring: cloud: function: definition: function stream: default-binder: my-avro-binder bindings: function-in-0: binder: my-avro-binder destination: function-output group: constant-name contentType: application/* avro consumer: useNativeEncoding: true batchMode: true headerMode: headers
определение связующего:
kafka-string-avro: type: kafka environment.spring.cloud.stream.kafka.binder.consumerProperties: key.serializer: org.apache.kafka.common.serialization.StringSerializer key.deserializer: org.apache.kafka.common.serialization.StringDeserializer value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081} specific.avro.reader: true
определение функции:
@Bean public Functionlt;Listlt;?gt;, Listlt;Messagelt;MyTypegt;gt;gt; function() { Functionlt;Listlt;?gt;, Listlt;Messagelt;MyTypegt;gt;gt; func = list -gt; { // logic... return list; }; return func; }
Ответ №1:
Спасибо @GaryRussell, после еще нескольких испытаний вот что я сделал:
- добавлено
contentType: application/* avro
свойство в мою потребительскую привязку (иначе это не сработало бы). - предоставленные заголовки находятся в ключе
Kafka_batchConvertedHeaders
заголовка, представленном в виде списка карт, каждая из которых представляет все карты заголовков одного сообщения.
Комментарии:
1. Если вы хотите поблагодарить пользователя, подумайте о том, чтобы проголосовать за его ответ, а не просто упомянуть его здесь. Это не то, что следует включать в ответ.
Ответ №2:
Используйте Messagelt;Listlt;Typegt;gt;
заголовки, содержащие списки заголовков (в том же порядке, что и полезные нагрузки).
например getHeader().get(KafkaHeaders.OFFSET, List.class)
, возвращает a Listlt;Longgt;
.