Тестирование интеграции Apache Kafka в приложении Spring Boot с JUnit 5 и встроенным Kafkabroker

#spring-boot #spring-kafka #spring-test

#spring-boot #spring-kafka #spring-тест

Вопрос:

У меня есть простой класс производителя, определенный следующим образом:

 @Configuration
public class MyKafkaProducer {

    private final static Logger log = LoggerFactory.getLogger(MyKafkaProducer.class);

    @Value("${my.kafka.producer.topic}")
    private String topic;

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    public void sendDataToKafka(@RequestParam String data) {

        ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topic, data);

        listenableFuture.addCallback(new ListenableFutureCallback<>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("Sent data {}", result.getProducerRecord().value());
            }

            @Override
            public void onFailure(Throwable ex) {
                log.error("Unable to send data {} due to: {}", data, ex.getMessage());
            }
        });
    }
}
  

И вот тестовый класс в процессе выполнения:

 @EmbeddedKafka
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyKafkaProducerTest {

    private static final String TOPIC = "device";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    private MyKafkaProducer producer;

    BlockingQueue<ConsumerRecord<String, String>> records;

    KafkaMessageListenerContainer<String, String> container;

    @BeforeAll
    void setUp() {
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
        DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer());
        ContainerProperties containerProperties = new ContainerProperties(TOPIC);
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener<String, String>) records::add);
        container.start();
        ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
    }

    @AfterAll
    void tearDown() {
        container.stop();
    }

    @Test
    public void testIfWorks() throws InterruptedException {
        // Arrange
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();

        // Act
        producer.send(new ProducerRecord<>(TOPIC, "my-aggregate-id", "{"event":"Test Event"}"));
        producer.flush();

        // Assert
        ConsumerRecord<String, String> singleRecord = records.poll(100, TimeUnit.MILLISECONDS);
        assertThat(singleRecord).isNotNull();
        assertThat(singleRecord.key()).isEqualTo("my-aggregate-id");
        assertThat(singleRecord.value()).isEqualTo("{"event":"Test Event"}");
    }
  

Проблема в том, что тест создает производителя по умолчанию:

 Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
  

Как я могу использовать свой собственный producer, MyKafkaProducer и вызвать его sendDataToKafka метод? Как и что мы можем протестировать в этом случае?

Исходный код можно найти здесь. Здесь находится ветка с незавершенным тестированием. Спасибо.

Комментарии:

1. Вам нужно показать, как вы настраиваете KafkaTemplate и его фабрику-производителя в вашем producer. Суть в том, что вы должны принудить его использовать встроенные адреса брокера kafka, но мы не можем помочь с этим, если вы не покажете, как вы его настраиваете.

2. Привет, Гэри, спасибо за ответ. Класс MyKafkaProducer такой, как он опубликован выше, не более того.

3. Верно, но как насчет KafkaTemplate того, что подключено к нему; где это настроено?

4. Я только что внес последние изменения в репозиторий GitHub и обновил сообщение с его ссылками.

5. @GaryRussell, что касается KafkaTemplate , нет, он просто вводится в класс producer с @Autowired аннотацией. Или я чего-то не понимаю?

Ответ №1:

Итак, это приложение Spring Boot, и вы используете автоматическую настройку KafkaTemplate .

Чтобы переопределить bootstrap-servers использование встроенного брокера kafka, см. https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#kafka-testing-embeddedkafka-annotation

 @EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers")
  

Затем вы можете вызвать своего производителя из тестового примера.

Комментарии:

1. Я добавил вышеупомянутую аннотацию, а также объявил private MyKafkaProducer myKafkaProducer; аннотированный с помощью @Autowired . Затем создал простой тестовый метод, который проверяет это: assertTrue(myKafkaProducer != null); . Это не удалось с Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.codeaches.kafka.basics.MyKafkaProducer' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true)}

2. Аннотация в следующем виде: @EmbeddedKafka(topics = "device", bootstrapServersProperty = "spring.kafka.producer.bootstrap-servers") . Тест завершается неудачей, даже если я удаляю новый метод тестирования и сохраняю автоматическое private MyKafkaProducer myKafkaProducer; объявление.

3. Вам нужно добавить @SpringBootTest в свой тестовый пример. Или добавьте @Bean объявление для вашего производителя в тестовом контексте.

4. да, большое спасибо, это сработало, и при вызове производителя myKafkaProducer.sendDataToKafka("Hello D-1") это отобразилось Sent data Hello D-1 в журнале консоли, ура!

5. Вы загружаете из темы device consumer: partitions assigned: [device-0, device-1] , но похоже, что вы публикуете в теме с именем My-Test-Topic Auto creation of topic My-Test-Topic with 2 partitions and replication factor 1 is successful .