#spring-kafka
#spring-kafka
Вопрос:
Я хочу иметь возможность получить запись.Ключ из сообщения типа avro schema.
У меня есть мой класс kafkaConsumerConfig, который задает свойства и содержит kafkalistener contrainerfactory
public ConsumerFactory<String, GenericRecord> kafkaConsumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList);
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,KafkaAvroDeserializer.class);
...;
return new DefaultKafkaConsumerFactory<>(properties);
}
//Used to poll for messages
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
return factory;
}
Это мой класс слушателя:
@KafkaListener(topics = "topicName", groupId = "groupIDName")
public class KafkaListenerclass {
Logger logger = LoggerFactory.getLogger(KafkaListenerclass.class);
@KafkaHandler
void listenerGenericRecord(GenericRecord record) {
logger.info("KafkaHandler[Default] {}", record);
System.out.println("Im here generic listener..." record.toString());
}
@KafkaHandler
public void listenPayload(@Payload String message,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey) {
System.out.println("Im here listenPayload..." messageKey);
}
Он всегда использует listenerGEnericRecord(), из которого я, похоже, не могу извлечь другие сведения о сообщении.
Когда я удаляю этот метод, чтобы посмотреть, подходит ли он для использования listenPayload(), я получаю :
Сбой прослушивателя; вложенным исключением является org.springframework.kafka.Исключение KafkaException: метод не найден для класса org.apache.avro.generic.GenericData $Запись
Кажется, я не могу найти никакой помощи в этом, кроме как переписать код, который не использует эту структуру bean
Ответ №1:
Как насчет этого:
void listenerGenericRecord(GenericRecord record,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey) {
Это GenericRecord
POJO для данных Avro, которые хранятся только в записи Kafka body
. Заголовки ConsumerRecord
полностью не связаны с Avro. Итак, если в заголовках записей Kafka хранятся какие-то метаданные, вы должны отличать их от данных, хранящихся только в теле.
Комментарии:
1. Это работает отлично, спасибо. Также спасибо за хорошее объяснение