Протестируйте шаблон потребителя и производителя реактивной Кафки, используя встроенную сериализацию kafka custom

#spring-kafka #reactive-kafka

Вопрос:

Нам нужен пример того, как тестировать ReactiveKafkaConsumerTemplate и ReactiveKafkaProducerTemplate с помощью an embedded-kafka-broker . Спасибо.

ПРАВИЛЬНЫЙ КОД НАХОДИТСЯ ЗДЕСЬ ПОСЛЕ ОБСУЖДЕНИЯ

Вы можете иметь свои de-serializer собственные настройки, чтобы использовать пользовательские ReactiveKafkaConsumerTemplate

Пользовательский Сериализатор:

 
import org.apache.kafka.common.serialization.Serializer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class EmployeeSerializer implements Serializer<Employee> {

    @Override
    public byte[] serialize(String topic, Employee data) {
        
        byte[] rb = null;
        ObjectMapper mapper = new ObjectMapper();
        try {
            rb = mapper.writeValueAsString(data).getBytes();
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return rb;
    }

}
 

Используйте его как часть встроенного-kfka-реактивного теста:

 
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.json.JsonSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;

import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.test.StepVerifier;

@EmbeddedKafka(topics = EmbeddedKafkareactiveTest.REACTIVE_INT_KEY_TOPIC,
brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" })
public class EmbeddedKafkareactiveTest {

    public static final String REACTIVE_INT_KEY_TOPIC = "reactive_int_key_topic";

    private static final Integer DEFAULT_KEY = 1;

    private static final String DEFAULT_VERIFY_TIMEOUT = null;

    private ReactiveKafkaProducerTemplate<Integer, Employee> reactiveKafkaProducerTemplate;

    @BeforeEach
    public void setUp() {
        reactiveKafkaProducerTemplate = new ReactiveKafkaProducerTemplate<>(setupSenderOptionsWithDefaultTopic(),
                new MessagingMessageConverter());
    }

    private SenderOptions<Integer, Employee> setupSenderOptionsWithDefaultTopic() {
        Map<String, Object> senderProps = KafkaTestUtils
                .producerProps(EmbeddedKafkaCondition.getBroker().getBrokersAsString());
        SenderOptions<Integer, Employee> senderOptions = SenderOptions.create(senderProps);
        senderOptions = senderOptions.producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "reactive.transaction")
                .producerProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
                .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName())
                ;
        return senderOptions;
    }

    @Test
    public void test_When_Publish() {
        
        
        Employee employee = new Employee();
        
        ProducerRecord<Integer, Employee> producerRecord = new ProducerRecord<Integer, Employee>(REACTIVE_INT_KEY_TOPIC, DEFAULT_KEY, employee);
                
        StepVerifier.create(reactiveKafkaProducerTemplate.send(producerRecord)
                .then())
                .expectComplete()
                .verify();
    }   

    @AfterEach
    public void tearDown() {
        reactiveKafkaProducerTemplate.close();
    }
}
 

Ответ №1:

Тесты в рамках используют встроенный брокер кафки.

https://github.com/spring-projects/spring-kafka/tree/main/spring-kafka/src/test/java/org/springframework/kafka/core/reactive

 @EmbeddedKafka(topics = ReactiveKafkaProducerTemplateIntegrationTests.REACTIVE_INT_KEY_TOPIC, partitions = 2)
public class ReactiveKafkaProducerTemplateIntegrationTests {
...
 

Комментарии:

1. Не помещайте код в комментарии; лучше отредактировать вопрос и прокомментировать, что вы это сделали. Мне нужно увидеть полный тест и фактическое исключение.

2. Спасибо. Я добавил.

3. Пожалуйста, ознакомьтесь с моими правками для правильной уценки для форматирования кода. setupSenderOptionsWithDefaultTopic() — если вы скопировали это из тестов фреймворка, то да, он устанавливает IntegerSerializer значение для ключа и SrtringSerializer значения — вам нужен сериализатор значений, который может обрабатывать ваш Employee объект, например JsonSerializer .

4. «Не работает» не передает никакой полезной информации. Если вы имеете в виду, что по-прежнему получаете ту же ошибку, это означает, что вы неправильно настраиваете свойство. Если он установлен правильно, он будет работать. Я не могу сказать из фрагмента кода, что вы делаете. Добавьте полный тест, а не только фрагменты.

5. Это не та же проблема, это другая ошибка. Есть две проблемы. 1. Вы используете неправильное JsonSerializer ; так и должно быть import org.springframework.kafka.support.serializer.JsonSerializer; . 2. Вы создаете транзакционного производителя и используете нетранзакционную отправку. Ваш тест проходит с нетранзакционным производителем и правильным сериализатором.

Ответ №2:

добавлена правильная сериализация с нетранзакционным производителем. пожалуйста, ознакомьтесь с кодом в верхней части этой страницы для получения ответа.