Не вызван пользовательский конвертер сообщений Kafka

#listener #message #kafka-consumer-api #spring-kafka #converters

#прослушиватель #Сообщение #kafka-consumer-api #spring-kafka #конвертеры

Вопрос:

Я создал прослушиватель Kafka с помощью моей фабрики контейнеров со следующим кодом. Я указываю пользовательский конвертер сообщений (HeaderMessageConverter в приведенном ниже коде) при настройке фабрики контейнеров. Когда мой слушатель создан, я даже могу проверить, что ему назначен пользовательский конвертер сообщений.

         String prefix = topic   propConstant;
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(getConsumerFactory(topic));
        factory.setConcurrency(config.getInt(prefix    "concurrency"));
        factory.setBatchListener(config.getBoolean(prefix   "batch.listener"));
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
        factory.setMessageConverter(new HeaderMessageConverter());
        return factory;


public class HeaderMessageConverter implements BatchMessageConverter {
    
    /**
     * Convert a list of {@link ConsumerRecord} to a {@link Message}.
     *
     * @param records        the records.
     * @param acknowledgment the acknowledgment.
     * @param payloadType    the required payload type.
     * @return the message.
     */
    @Override
    public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment acknowledgment, Type payloadType) {

        Map<String, Object> rawHeaders = new HashMap<>();
        List<Object> payloads = new ArrayList<>();
        List<Object> keys = new ArrayList<>();
        List<String> topics = new ArrayList<>();
        List<Map<String , Object>> customHeaders = new ArrayList<>();
        List<Integer> partitions = new ArrayList<>();
        List<Long> offsets = new ArrayList<>();
        rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, keys);
        rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, topics);
        rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, partitions);
        rawHeaders.put(KafkaHeaders.OFFSET, offsets);
        rawHeaders.put("CUSTOM_HEADERS", customHeaders);

        if (acknowledgment != null) {
            rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment);
        }
        
        for (ConsumerRecord<?, ?> record : records) {
            payloads.add(extractAndConvertValue(record, payloadType));
            keys.add(record.key());
            Map<String, Object> headerMap = new HashMap<>();
            for (Header header : record.headers()) {
                headerMap.put(header.key(), new String(header.value()));
            }
            topics.add(record.topic());
            partitions.add(record.partition());
            offsets.add(record.offset());
            customHeaders.add(headerMap);
        }
        MessageHeaders kafkaMessageHeaders = new MessageHeaders(rawHeaders);
        return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
    }
 

Однако, как только слушатель получает сообщение, вместо перехода к переопределенному методу toMessage HeaderMessageConverter он переходит к методу toMessage org.springframework.kafka.support.converter .MessagingMessageConverter.java предоставлено spring-kafka.

Я использую Spring Kafka 1.1.8. Есть идеи, почему мой пользовательский конвертер сообщений toMessage не вызывается?

Комментарии:

1. Spring Kafka 1.1.x не поддерживается уже несколько лет; даже 1.3.11 не будет поддерживаться после этого года. Текущая версия 2.6.4; если у вас все еще есть проблемы с поддерживаемой версией , опубликуйте полный проверяемый простой пример, который демонстрирует поведение, которое вы видите.

2. @GaryRussell У меня пока нет возможности обновить версию spring Kafka. Я также понял, почему мой пользовательский конвертер сообщений не вызывается и почему он использует MessagingMessageConverter. Есть ли способ предоставить свою собственную версию BatchMessagingMessageListenerAdapter ?

3. 1.1.x имеет очень сложную потоковую модель; по крайней мере, вы должны обновиться до версии 1.3.11, которая намного проще, благодаря KIP-62. Чтобы переопределить адаптер , вам нужно будет создать подкласс MethodKafkaListenerEndpoint и переопределить createMessageListenerInstance() . Чтобы использовать пользовательскую конечную точку, вам нужно было бы KafkaListenerAnnotationBeanPostProcessor подклассировать и переопределить processKafkaListener() (это основано на текущем состоянии кода; я не просматривал древние версии, чтобы узнать, было ли это возможно тогда).