#listener #message #kafka-consumer-api #spring-kafka #converters
#прослушиватель #Сообщение #kafka-consumer-api #spring-kafka #конвертеры
Вопрос:
Я создал прослушиватель Kafka с помощью моей фабрики контейнеров со следующим кодом. Я указываю пользовательский конвертер сообщений (HeaderMessageConverter в приведенном ниже коде) при настройке фабрики контейнеров. Когда мой слушатель создан, я даже могу проверить, что ему назначен пользовательский конвертер сообщений.
String prefix = topic propConstant;
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(getConsumerFactory(topic));
factory.setConcurrency(config.getInt(prefix "concurrency"));
factory.setBatchListener(config.getBoolean(prefix "batch.listener"));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
factory.setMessageConverter(new HeaderMessageConverter());
return factory;
public class HeaderMessageConverter implements BatchMessageConverter {
/**
* Convert a list of {@link ConsumerRecord} to a {@link Message}.
*
* @param records the records.
* @param acknowledgment the acknowledgment.
* @param payloadType the required payload type.
* @return the message.
*/
@Override
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment acknowledgment, Type payloadType) {
Map<String, Object> rawHeaders = new HashMap<>();
List<Object> payloads = new ArrayList<>();
List<Object> keys = new ArrayList<>();
List<String> topics = new ArrayList<>();
List<Map<String , Object>> customHeaders = new ArrayList<>();
List<Integer> partitions = new ArrayList<>();
List<Long> offsets = new ArrayList<>();
rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, keys);
rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, topics);
rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, partitions);
rawHeaders.put(KafkaHeaders.OFFSET, offsets);
rawHeaders.put("CUSTOM_HEADERS", customHeaders);
if (acknowledgment != null) {
rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment);
}
for (ConsumerRecord<?, ?> record : records) {
payloads.add(extractAndConvertValue(record, payloadType));
keys.add(record.key());
Map<String, Object> headerMap = new HashMap<>();
for (Header header : record.headers()) {
headerMap.put(header.key(), new String(header.value()));
}
topics.add(record.topic());
partitions.add(record.partition());
offsets.add(record.offset());
customHeaders.add(headerMap);
}
MessageHeaders kafkaMessageHeaders = new MessageHeaders(rawHeaders);
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
}
Однако, как только слушатель получает сообщение, вместо перехода к переопределенному методу toMessage HeaderMessageConverter он переходит к методу toMessage org.springframework.kafka.support.converter .MessagingMessageConverter.java предоставлено spring-kafka.
Я использую Spring Kafka 1.1.8. Есть идеи, почему мой пользовательский конвертер сообщений toMessage не вызывается?
Комментарии:
1. Spring Kafka 1.1.x не поддерживается уже несколько лет; даже 1.3.11 не будет поддерживаться после этого года. Текущая версия 2.6.4; если у вас все еще есть проблемы с поддерживаемой версией , опубликуйте полный проверяемый простой пример, который демонстрирует поведение, которое вы видите.
2. @GaryRussell У меня пока нет возможности обновить версию spring Kafka. Я также понял, почему мой пользовательский конвертер сообщений не вызывается и почему он использует MessagingMessageConverter. Есть ли способ предоставить свою собственную версию BatchMessagingMessageListenerAdapter ?
3. 1.1.x имеет очень сложную потоковую модель; по крайней мере, вы должны обновиться до версии 1.3.11, которая намного проще, благодаря KIP-62. Чтобы переопределить адаптер , вам нужно будет создать подкласс
MethodKafkaListenerEndpoint
и переопределитьcreateMessageListenerInstance()
. Чтобы использовать пользовательскую конечную точку, вам нужно было быKafkaListenerAnnotationBeanPostProcessor
подклассировать и переопределитьprocessKafkaListener()
(это основано на текущем состоянии кода; я не просматривал древние версии, чтобы узнать, было ли это возможно тогда).