#json #spring-boot #serialization #apache-kafka #configuration
#json #весенняя загрузка #сериализация #apache-кафка #конфигурация
Вопрос:
Я работаю с kafka и spring boot, и мне нужно отправить объект JSON в kafka, дело в том, что я могу отправить объект в формате JSON, настраивая KafkaTemplate, но только для этого объекта.
package com.bankia.apimanager.config;
import com.bankia.apimanager.model.RequestDTO;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, RequestDTO> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, RequestDTO> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
package com.bankia.apimanager.controller;
import com.bankia.apimanager.model.RequestDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/infrastructure")
public class InfraStructureRequestController {
private final static Logger LOG = LoggerFactory.getLogger( InfraStructureRequestController.class );
private static final String TOPIC = "test";
@Autowired
private KafkaTemplate<String, RequestDTO> sender;
@RequestMapping(value = "/test", method = RequestMethod.GET)
public String postMessage(){
ListenableFuture<SendResult<String, RequestDTO>> future = sender.send(TOPIC, new RequestDTO("Hola","Paco"));
future.addCallback(new ListenableFutureCallback<SendResult<String, RequestDTO>>() {
@Override
public void onSuccess(SendResult<String, RequestDTO> result) {
LOG.info("Sent message with offset=[" result.getRecordMetadata().offset() "]");
}
@Override
public void onFailure(Throwable ex) {
LOG.error("Unable to send message due to : " ex.getMessage());
}
});
return "OK";
}
}
но что, если теперь я хочу отправить новый объект DTO? должен ли я объявлять новый KafkaTemplate<String,NEWOBJECT>
и автоматически подключать каждый шаблон kafka, объявленный в конфигурации для каждого объекта? есть другой способ иметь возможность просто объявить один KafkaTemplate, в котором я могу отправлять объекты любого типа и автоматически сериализовываться в JSON?
Ответ №1:
Я думаю, вы можете указать универсальный KafkaTemplate<String, Object>
и установить для сериализатора значений производителя значение JsonSerializer
следующим образом:
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Комментарии:
1. Не могли бы вы уточнить, что именно не работает? Спасибо!
Ответ №2:
Ссылка на ваш код:
- Сериализатор значений правильно определен как JsonSerializer, который преобразует объекты любого типа в JSON.
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
- Измените <String, RequestDto> на <String, Object> в каждом месте в KafkaConfig amp; Controller.
Имейте в виду, что дженерики сохраняются только до времени компиляции (удаление типа).
Ответ №3:
Существует два сценария:
Сценарий #1
Если вы хотите использовать KafkaTemplate
для отправки любого типа (как указано в вашем вопросе) в kafka, поэтому нет необходимости объявлять свой собственный KafkaTemplate
компонент, потому что Spring boot сделал это для вас в KafkaAutoConfiguration
.
package org.springframework.boot.autoconfigure.kafka;
...
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {
private final KafkaProperties properties;
public KafkaAutoConfiguration(KafkaProperties properties) {
this.properties = properties;
}
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
}
**Some Note**
:
-
Этот класс конфигурации был помечен
@ConditionalOnClass(KafkaTemplate.class)
это означает: (из spring docs—>) @Conditional, который соответствует только тогда, когда указанные классы находятся в пути к классу. -
Метод компонента KafkaTemplate снабжен аннотацией
@ConditionalOnMissingBean(KafkaTemplate.class)
, что означает: (из spring docs —->) @Conditional, который соответствует только тогда, когда в BeanFactory уже не содержатся компоненты, соответствующие указанным требованиям. -
Важно! В чистом мире Java,
KafkaTemplate<?, ?>
не является подтипом, например:KafkaTemplate<String, RequestDTO>
поэтому вы не можете этого сделать:
KafkaTemplate<?, ?> kf1 = ...;
KafkaTemplate<String, RequestDTO> kf2 = kf1; // Compile time error
поскольку параметризованные типы Java инвариантны, как указано в действующем пункте 31 третьего издания Java. Но является ли spring world в порядке и будет ли он внедрен в ваш собственный сервис. Вам нужно только указать свой собственный универсальный тип в ваших свойствах KafkaTemplate.
Например:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaService {
@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate1;
@Autowired
private KafkaTemplate<Integer, RequestDTO> KafkaTemplate2;
}
Сценарий #2
Если вам нужно ограничить тип значения записи kafka, тогда вам нужно указать свой собственный компонент kafka примерно так:
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(CorridorTracingConfiguration.class)
public class CorridorKafkaAutoConfiguration {
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, AbstractMessage> kafkaTemplate(ProducerFactory<Object, AbstractMessage> kafkaProducerFactory,
ProducerListener<Object, AbstractMessage> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, AbstractMessage> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
Теперь это может быть введено только в
KafkaTemplate<String, AbstractMessage> kafkaTemplate
, тип ключа может быть любым другим вместо String
. Но вы можете отправить любой подтип AbstractMessage
в kafka через него.
Пример использования:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaService {
@Autowired
private KafkaTemplate<String, AbstractMessage> kafkaTemplate;
public void makeTrx(TrxRequest trxRequest) {
kafkaTemplate.send("fraud-request", trxRequest.fromAccountNumber(), new FraudRequest(trxRequest));
}
}
@Accessors(chain = true)
@Getter
@Setter
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class FraudRequest extends AbstractMessage {
private float amount;
private String fromAccountNumber;
private String toAccountNumber;
...
}
Чтобы ограничить ключ сообщения kafka, следуйте тому же (описанному выше) способу