Получение заголовков сообщений в пакетном режиме

#header #spring-batch #spring-kafka #spring-cloud-stream

#заголовок #пружинная партия #весна-кафка #весна-облако-поток

Вопрос:

В приложении spring-cloud-stream Kafka, когда я потребляю сообщения в обычном режиме, я правильно получаю сообщение с его полезной нагрузкой и пользовательскими заголовками. тем не менее, когда я устанавливаю для «headerMode» потребителя значение true, а для типа ввода Функции Listlt;?gt; (в соответствии с документацией), я получаю список полезных нагрузок. Как я могу получить список сообщений, каждое из которых содержит заголовок и полезную нагрузку?

уточнение — событие при настройке типа коллекции (например Listlt;Messagelt;MyTypegt;gt; , она всегда возвращает только полезную нагрузку ( Listlt;MyTypegt; ) во время выполнения.

приложение.yaml:

 spring: cloud:  function:  definition: function  stream:  default-binder: my-avro-binder  bindings:  function-in-0:  binder: my-avro-binder  destination: function-output  group: constant-name  contentType: application/* avro  consumer:  useNativeEncoding: true  batchMode: true  headerMode: headers  

определение связующего:

 kafka-string-avro:  type: kafka  environment.spring.cloud.stream.kafka.binder.consumerProperties:  key.serializer: org.apache.kafka.common.serialization.StringSerializer  key.deserializer: org.apache.kafka.common.serialization.StringDeserializer  value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer  value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer  schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}  specific.avro.reader: true  

определение функции:

 @Bean public Functionlt;Listlt;?gt;, Listlt;Messagelt;MyTypegt;gt;gt; function() {  Functionlt;Listlt;?gt;, Listlt;Messagelt;MyTypegt;gt;gt; func =  list -gt; {  // logic...  return list;  };  return func; }  

Ответ №1:

Спасибо @GaryRussell, после еще нескольких испытаний вот что я сделал:

  1. добавлено contentType: application/* avro свойство в мою потребительскую привязку (иначе это не сработало бы).
  2. предоставленные заголовки находятся в ключе Kafka_batchConvertedHeaders заголовка, представленном в виде списка карт, каждая из которых представляет все карты заголовков одного сообщения.

Комментарии:

1. Если вы хотите поблагодарить пользователя, подумайте о том, чтобы проголосовать за его ответ, а не просто упомянуть его здесь. Это не то, что следует включать в ответ.

Ответ №2:

Используйте Messagelt;Listlt;Typegt;gt; заголовки, содержащие списки заголовков (в том же порядке, что и полезные нагрузки).

например getHeader().get(KafkaHeaders.OFFSET, List.class) , возвращает a Listlt;Longgt; .