весенняя загрузка kafka generic JSON templateSender

#json #spring-boot #serialization #apache-kafka #configuration

#json #весенняя загрузка #сериализация #apache-кафка #конфигурация

Вопрос:

Я работаю с kafka и spring boot, и мне нужно отправить объект JSON в kafka, дело в том, что я могу отправить объект в формате JSON, настраивая KafkaTemplate, но только для этого объекта.

 package com.bankia.apimanager.config;

import com.bankia.apimanager.model.RequestDTO;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, RequestDTO> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, RequestDTO> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}
  
 package com.bankia.apimanager.controller;

import com.bankia.apimanager.model.RequestDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/infrastructure")
public class InfraStructureRequestController {

    private final static Logger LOG = LoggerFactory.getLogger( InfraStructureRequestController.class );

    private static final String TOPIC = "test";

    @Autowired
    private KafkaTemplate<String, RequestDTO> sender;

    @RequestMapping(value = "/test", method = RequestMethod.GET)
    public String postMessage(){

        ListenableFuture<SendResult<String, RequestDTO>> future = sender.send(TOPIC, new RequestDTO("Hola","Paco"));
        future.addCallback(new ListenableFutureCallback<SendResult<String, RequestDTO>>() {
            @Override
            public void onSuccess(SendResult<String, RequestDTO> result) {
                LOG.info("Sent message with offset=["   result.getRecordMetadata().offset()   "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                LOG.error("Unable to send message due to : "   ex.getMessage());
            }
        });
        return "OK";
    }
}
  

но что, если теперь я хочу отправить новый объект DTO? должен ли я объявлять новый KafkaTemplate<String,NEWOBJECT> и автоматически подключать каждый шаблон kafka, объявленный в конфигурации для каждого объекта? есть другой способ иметь возможность просто объявить один KafkaTemplate, в котором я могу отправлять объекты любого типа и автоматически сериализовываться в JSON?

Ответ №1:

Я думаю, вы можете указать универсальный KafkaTemplate<String, Object> и установить для сериализатора значений производителя значение JsonSerializer следующим образом:

 @Configuration
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}
  

Комментарии:

1. Не могли бы вы уточнить, что именно не работает? Спасибо!

Ответ №2:

Ссылка на ваш код:

  • Сериализатор значений правильно определен как JsonSerializer, который преобразует объекты любого типа в JSON.
 @Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return props;
}
  
  • Измените <String, RequestDto> на <String, Object> в каждом месте в KafkaConfig amp; Controller.

Имейте в виду, что дженерики сохраняются только до времени компиляции (удаление типа).

Ответ №3:

Существует два сценария:

Сценарий #1

Если вы хотите использовать KafkaTemplate для отправки любого типа (как указано в вашем вопросе) в kafka, поэтому нет необходимости объявлять свой собственный KafkaTemplate компонент, потому что Spring boot сделал это для вас в KafkaAutoConfiguration .

 package org.springframework.boot.autoconfigure.kafka;

...

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {

    private final KafkaProperties properties;

    public KafkaAutoConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
            ProducerListener<Object, Object> kafkaProducerListener,
            ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }
}


  

**Some Note** :

  1. Этот класс конфигурации был помечен @ConditionalOnClass(KafkaTemplate.class) это означает: (из spring docs—>) @Conditional, который соответствует только тогда, когда указанные классы находятся в пути к классу.

  2. Метод компонента KafkaTemplate снабжен аннотацией @ConditionalOnMissingBean(KafkaTemplate.class) , что означает: (из spring docs —->) @Conditional, который соответствует только тогда, когда в BeanFactory уже не содержатся компоненты, соответствующие указанным требованиям.

  3. Важно! В чистом мире Java, KafkaTemplate<?, ?> не является подтипом, например: KafkaTemplate<String, RequestDTO> поэтому вы не можете этого сделать:

 KafkaTemplate<?, ?> kf1 = ...;
KafkaTemplate<String, RequestDTO> kf2 = kf1; // Compile time error
  

поскольку параметризованные типы Java инвариантны, как указано в действующем пункте 31 третьего издания Java. Но является ли spring world в порядке и будет ли он внедрен в ваш собственный сервис. Вам нужно только указать свой собственный универсальный тип в ваших свойствах KafkaTemplate.
Например:

 import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaService {
    @Autowired
    private KafkaTemplate<Integer, String> kafkaTemplate1;
    @Autowired
    private KafkaTemplate<Integer, RequestDTO> KafkaTemplate2;
}

  

Сценарий #2

Если вам нужно ограничить тип значения записи kafka, тогда вам нужно указать свой собственный компонент kafka примерно так:

 
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(CorridorTracingConfiguration.class)
public class CorridorKafkaAutoConfiguration {
    
    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, AbstractMessage> kafkaTemplate(ProducerFactory<Object, AbstractMessage> kafkaProducerFactory,
                                                              ProducerListener<Object, AbstractMessage> kafkaProducerListener,
                                                              ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, AbstractMessage> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

  

Теперь это может быть введено только в
KafkaTemplate<String, AbstractMessage> kafkaTemplate , тип ключа может быть любым другим вместо String . Но вы можете отправить любой подтип AbstractMessage в kafka через него.

Пример использования:

 import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaService {
    @Autowired
    private KafkaTemplate<String, AbstractMessage> kafkaTemplate;

    public void makeTrx(TrxRequest trxRequest) {
        kafkaTemplate.send("fraud-request", trxRequest.fromAccountNumber(), new FraudRequest(trxRequest));
    }


}

@Accessors(chain = true)
@Getter
@Setter
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class FraudRequest extends AbstractMessage {
    private float amount;
    private String fromAccountNumber;
    private String toAccountNumber;

...

}

  

Чтобы ограничить ключ сообщения kafka, следуйте тому же (описанному выше) способу