Сериализатор ключей Spring Kafka не работает для объекта

#apache-kafka #spring-kafka

#apache-kafka #spring-kafka

Вопрос:

Я не могу воспроизвести документацию или образец кода, чтобы сериализовать не String ключ.

Моя цель — использовать ключ (поле) для передачи управляющих действий вместе с данными.

Классы ControlChannel и SchedulerEntry являются обычным Pojo.

Среда:

  • Java 11
  • Загрузка Spring 2.4.1
  • Kafka 2.6.0

Ожидаемый код для сериализации / десериализации:

Прослушиватель и шаблон

 
    @KafkaListener(topics = "Scheduler", groupId = "scheduler", containerFactory = "schedulerKafkaListenerContainerFactory")
    public void listenForScheduler(
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ControlChannel control, 
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
        @Payload SchedulerEntry entry) {

        log.info("received data KEY ='{}'", control);
        log.info("received data PAYLOAD = '{}'", entry);

        /* ... */

    }

    @Bean
    public KafkaTemplate<ControlChannel, SchedulerEntry> schedulerKafkaTemplate() {
        return new KafkaTemplate<>(schedulerProducerFactory());
    }

 

** Первая попытка — потребитель и производитель (сопоставление типов и доверенный пакет) **

 
    @Bean
    public ProducerFactory<ControlChannel, SchedulerEntry> schedulerProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        props.put(JsonSerializer.TYPE_MAPPINGS, "key:io.infolayer.aida.ControlChannel, value:io.infolayer.aida.entity.SchedulerEntry");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(props, 
            new JsonSerializer<ControlChannel>(),
            new JsonSerializer<SchedulerEntry>());
    }
   
    
    public ConsumerFactory<ControlChannel, SchedulerEntry> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, false);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(JsonDeserializer.TYPE_MAPPINGS, "key:io.infolayer.aida.ControlChannel, value:io.infolayer.aida.entity.SchedulerEntry");
    
        JsonDeserializer<ControlChannel> k = new JsonDeserializer<ControlChannel>();
        k.configure(props, true);
    
        JsonDeserializer<SchedulerEntry> v = new JsonDeserializer<SchedulerEntry>();
        k.configure(props, true);
    
        return new DefaultKafkaConsumerFactory<>(props, k, v);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> schedulerKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("scheduler"));
        return factory;
    }

 

Исключение:

 Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition Scheduler-0 at offset 25. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
 

** Вторая попытка — потребитель и производитель (просто установите сериализатор / десериализатор ключей как Json) **

 @Bean
    public ProducerFactory<ControlChannel, SchedulerEntry> schedulerProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    public ConsumerFactory<ControlChannel, SchedulerEntry> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props, new JsonDeserializer<>(ControlChannel.class), new JsonDeserializer<>(SchedulerEntry.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> schedulerKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("scheduler"));
        return factory;
    }


 

Исключение

 org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: 
Listener method 'public void  io.infolayer.aida.scheduler.KafkaSchedulerListener.listenForScheduler(io.infolayer.aida.ControlChannel,long,io.infolayer.aida.entity.SchedulerEntry)' 
threw exception; nested exception is org.springframework.core.convert.ConverterNotFoundException: 
No converter found capable of converting from type [io.infolayer.aida.entity.SchedulerEntry] to type [@org.springframework.messaging.handler.annotation.Header io.infolayer.aida.ControlChannel]; nested exception is org.springframework.core.convert.ConverterNotFoundException: 
No converter found capable of converting from type [io.infolayer.aida.entity.SchedulerEntry] to type [@org.springframework.messaging.handler.annotation.Header io.infolayer.aida.ControlChannel]
 

Ответ №1:

При первой попытке возникает несколько проблем.

  • вам нужно вызвать configure() сериализаторы с добавлением типа info=true
  • вы вызываете configure() k дважды и не настраиваете v (десериализаторы)

Это работает, как и ожидалось…

 @SpringBootApplication
public class So65501295Application {

    private static final Logger log = LoggerFactory.getLogger(So65501295Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So65501295Application.class, args);
    }

    @Bean
    public ProducerFactory<ControlChannel, SchedulerEntry> schedulerProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true);
        props.put(JsonSerializer.TYPE_MAPPINGS,
                "key:com.example.demo.So65501295Application.ControlChannel, "
                          "value:com.example.demo.So65501295Application.SchedulerEntry");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        JsonSerializer<ControlChannel> k = new JsonSerializer<ControlChannel>();
        k.configure(props, true);
        JsonSerializer<SchedulerEntry> v = new JsonSerializer<SchedulerEntry>();
        v.configure(props, false);
        return new DefaultKafkaProducerFactory<>(props, k, v);
    }

    public ConsumerFactory<ControlChannel, SchedulerEntry> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, false);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(JsonDeserializer.TYPE_MAPPINGS,
                "key:com.example.demo.So65501295Application.ControlChannel, "
                          "value:com.example.demo.So65501295Application.SchedulerEntry");

        JsonDeserializer<ControlChannel> k = new JsonDeserializer<ControlChannel>();
        k.configure(props, true);

        JsonDeserializer<SchedulerEntry> v = new JsonDeserializer<SchedulerEntry>();
        v.configure(props, false);

        return new DefaultKafkaConsumerFactory<>(props, k, v);
    }

    @KafkaListener(topics = "Scheduler", groupId = "scheduler", containerFactory = "schedulerKafkaListenerContainerFactory")
    public void listenForScheduler(
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ControlChannel control,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            @Payload SchedulerEntry entry) {

        log.info("received data KEY ='{}'", control);
        log.info("received data PAYLOAD = '{}'", entry);

        /* ... */

    }

    @Bean
    public KafkaTemplate<ControlChannel, SchedulerEntry> schedulerKafkaTemplate() {
        return new KafkaTemplate<>(schedulerProducerFactory());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> schedulerKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("scheduler"));
        return factory;
    }


    @Bean
    public ApplicationRunner runner(KafkaTemplate<ControlChannel, SchedulerEntry> template) {
        return args -> {
            template.send("Scheduler", new ControlChannel(), new SchedulerEntry());
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("Scheduler").partitions(1).replicas(1).build();
    }

    public static class ControlChannel {

        String foo;

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

    }

    public static class SchedulerEntry {

        String foo;

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

    }

}
 
 2021-01-04 11:42:25.026  INFO 23905 --- [ntainer#0-0-C-1] com.example.demo.So65501295Application   
    : received data KEY ='com.example.demo.So65501295Application$ControlChannel@44a72886'
2021-01-04 11:42:25.026  INFO 23905 --- [ntainer#0-0-C-1] com.example.demo.So65501295Application   
    : received data PAYLOAD = 'com.example.demo.So65501295Application$SchedulerEntry@74461c59'
 

Комментарии:

1. Спасибо — Как мне проанализировать ключ в определяемый пользователем объект, если в прослушивателе message inn kafka нет заголовка type?

2.Не задавайте новые вопросы в комментариях к старым ответам. Вы можете настроить тип с помощью свойств или программно. docs.spring.io/spring-kafka/docs/current/reference/html/… spring.json.key.default.type .