#apache-kafka #spring-kafka
#apache-kafka #spring-kafka
Вопрос:
Я не могу воспроизвести документацию или образец кода, чтобы сериализовать не String
ключ.
Моя цель — использовать ключ (поле) для передачи управляющих действий вместе с данными.
Классы ControlChannel
и SchedulerEntry
являются обычным Pojo.
Среда:
- Java 11
- Загрузка Spring 2.4.1
- Kafka 2.6.0
Ожидаемый код для сериализации / десериализации:
Прослушиватель и шаблон
@KafkaListener(topics = "Scheduler", groupId = "scheduler", containerFactory = "schedulerKafkaListenerContainerFactory")
public void listenForScheduler(
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ControlChannel control,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
@Payload SchedulerEntry entry) {
log.info("received data KEY ='{}'", control);
log.info("received data PAYLOAD = '{}'", entry);
/* ... */
}
@Bean
public KafkaTemplate<ControlChannel, SchedulerEntry> schedulerKafkaTemplate() {
return new KafkaTemplate<>(schedulerProducerFactory());
}
** Первая попытка — потребитель и производитель (сопоставление типов и доверенный пакет) **
@Bean
public ProducerFactory<ControlChannel, SchedulerEntry> schedulerProducerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(JsonSerializer.TYPE_MAPPINGS, "key:io.infolayer.aida.ControlChannel, value:io.infolayer.aida.entity.SchedulerEntry");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<ControlChannel>(),
new JsonSerializer<SchedulerEntry>());
}
public ConsumerFactory<ControlChannel, SchedulerEntry> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, false);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(JsonDeserializer.TYPE_MAPPINGS, "key:io.infolayer.aida.ControlChannel, value:io.infolayer.aida.entity.SchedulerEntry");
JsonDeserializer<ControlChannel> k = new JsonDeserializer<ControlChannel>();
k.configure(props, true);
JsonDeserializer<SchedulerEntry> v = new JsonDeserializer<SchedulerEntry>();
k.configure(props, true);
return new DefaultKafkaConsumerFactory<>(props, k, v);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> schedulerKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("scheduler"));
return factory;
}
Исключение:
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition Scheduler-0 at offset 25. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
** Вторая попытка — потребитель и производитель (просто установите сериализатор / десериализатор ключей как Json) **
@Bean
public ProducerFactory<ControlChannel, SchedulerEntry> schedulerProducerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
public ConsumerFactory<ControlChannel, SchedulerEntry> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props, new JsonDeserializer<>(ControlChannel.class), new JsonDeserializer<>(SchedulerEntry.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> schedulerKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("scheduler"));
return factory;
}
Исключение
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException:
Listener method 'public void io.infolayer.aida.scheduler.KafkaSchedulerListener.listenForScheduler(io.infolayer.aida.ControlChannel,long,io.infolayer.aida.entity.SchedulerEntry)'
threw exception; nested exception is org.springframework.core.convert.ConverterNotFoundException:
No converter found capable of converting from type [io.infolayer.aida.entity.SchedulerEntry] to type [@org.springframework.messaging.handler.annotation.Header io.infolayer.aida.ControlChannel]; nested exception is org.springframework.core.convert.ConverterNotFoundException:
No converter found capable of converting from type [io.infolayer.aida.entity.SchedulerEntry] to type [@org.springframework.messaging.handler.annotation.Header io.infolayer.aida.ControlChannel]
Ответ №1:
При первой попытке возникает несколько проблем.
- вам нужно вызвать
configure()
сериализаторы с добавлением типа info=true - вы вызываете
configure()
k
дважды и не настраиваетеv
(десериализаторы)
Это работает, как и ожидалось…
@SpringBootApplication
public class So65501295Application {
private static final Logger log = LoggerFactory.getLogger(So65501295Application.class);
public static void main(String[] args) {
SpringApplication.run(So65501295Application.class, args);
}
@Bean
public ProducerFactory<ControlChannel, SchedulerEntry> schedulerProducerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true);
props.put(JsonSerializer.TYPE_MAPPINGS,
"key:com.example.demo.So65501295Application.ControlChannel, "
"value:com.example.demo.So65501295Application.SchedulerEntry");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
JsonSerializer<ControlChannel> k = new JsonSerializer<ControlChannel>();
k.configure(props, true);
JsonSerializer<SchedulerEntry> v = new JsonSerializer<SchedulerEntry>();
v.configure(props, false);
return new DefaultKafkaProducerFactory<>(props, k, v);
}
public ConsumerFactory<ControlChannel, SchedulerEntry> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, false);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(JsonDeserializer.TYPE_MAPPINGS,
"key:com.example.demo.So65501295Application.ControlChannel, "
"value:com.example.demo.So65501295Application.SchedulerEntry");
JsonDeserializer<ControlChannel> k = new JsonDeserializer<ControlChannel>();
k.configure(props, true);
JsonDeserializer<SchedulerEntry> v = new JsonDeserializer<SchedulerEntry>();
v.configure(props, false);
return new DefaultKafkaConsumerFactory<>(props, k, v);
}
@KafkaListener(topics = "Scheduler", groupId = "scheduler", containerFactory = "schedulerKafkaListenerContainerFactory")
public void listenForScheduler(
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ControlChannel control,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
@Payload SchedulerEntry entry) {
log.info("received data KEY ='{}'", control);
log.info("received data PAYLOAD = '{}'", entry);
/* ... */
}
@Bean
public KafkaTemplate<ControlChannel, SchedulerEntry> schedulerKafkaTemplate() {
return new KafkaTemplate<>(schedulerProducerFactory());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> schedulerKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("scheduler"));
return factory;
}
@Bean
public ApplicationRunner runner(KafkaTemplate<ControlChannel, SchedulerEntry> template) {
return args -> {
template.send("Scheduler", new ControlChannel(), new SchedulerEntry());
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("Scheduler").partitions(1).replicas(1).build();
}
public static class ControlChannel {
String foo;
public String getFoo() {
return this.foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
}
public static class SchedulerEntry {
String foo;
public String getFoo() {
return this.foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
}
}
2021-01-04 11:42:25.026 INFO 23905 --- [ntainer#0-0-C-1] com.example.demo.So65501295Application
: received data KEY ='com.example.demo.So65501295Application$ControlChannel@44a72886'
2021-01-04 11:42:25.026 INFO 23905 --- [ntainer#0-0-C-1] com.example.demo.So65501295Application
: received data PAYLOAD = 'com.example.demo.So65501295Application$SchedulerEntry@74461c59'
Комментарии:
1. Спасибо — Как мне проанализировать ключ в определяемый пользователем объект, если в прослушивателе message inn kafka нет заголовка type?
2.Не задавайте новые вопросы в комментариях к старым ответам. Вы можете настроить тип с помощью свойств или программно. docs.spring.io/spring-kafka/docs/current/reference/html/…
spring.json.key.default.type
.