Как извлечь сведения о заголовке сообщения в методе KafkaListener, если сообщение является GenericRecord

#spring-kafka

#spring-kafka

Вопрос:

Я хочу иметь возможность получить запись.Ключ из сообщения типа avro schema.

У меня есть мой класс kafkaConsumerConfig, который задает свойства и содержит kafkalistener contrainerfactory

     public ConsumerFactory<String, GenericRecord> kafkaConsumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList);
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,KafkaAvroDeserializer.class);  
        ...;
        
        return new DefaultKafkaConsumerFactory<>(properties);
    }
    
    //Used to poll for messages
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactory() 
    { 
        
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(kafkaConsumerFactory());
      return factory;
    }
 

Это мой класс слушателя:

 @KafkaListener(topics = "topicName", groupId = "groupIDName")
public class KafkaListenerclass {
    
    Logger logger = LoggerFactory.getLogger(KafkaListenerclass.class);
    
    @KafkaHandler
      void listenerGenericRecord(GenericRecord record) {
        logger.info("KafkaHandler[Default] {}", record);
        System.out.println("Im here generic listener..."  record.toString());
      }
    
    @KafkaHandler
    public void listenPayload(@Payload String message,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey) {
         System.out.println("Im here listenPayload..."  messageKey);
    }
 

Он всегда использует listenerGEnericRecord(), из которого я, похоже, не могу извлечь другие сведения о сообщении.
Когда я удаляю этот метод, чтобы посмотреть, подходит ли он для использования listenPayload(), я получаю :

Сбой прослушивателя; вложенным исключением является org.springframework.kafka.Исключение KafkaException: метод не найден для класса org.apache.avro.generic.GenericData $Запись

Кажется, я не могу найти никакой помощи в этом, кроме как переписать код, который не использует эту структуру bean

Ответ №1:

Как насчет этого:

 void listenerGenericRecord(GenericRecord record,
           @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey) {
 

Это GenericRecord POJO для данных Avro, которые хранятся только в записи Kafka body . Заголовки ConsumerRecord полностью не связаны с Avro. Итак, если в заголовках записей Kafka хранятся какие-то метаданные, вы должны отличать их от данных, хранящихся только в теле.

Комментарии:

1. Это работает отлично, спасибо. Также спасибо за хорошее объяснение