#spring-boot #spring-kafka #spring-test
#spring-boot #spring-kafka #spring-тест
Вопрос:
У меня есть простой класс производителя, определенный следующим образом:
@Configuration
public class MyKafkaProducer {
private final static Logger log = LoggerFactory.getLogger(MyKafkaProducer.class);
@Value("${my.kafka.producer.topic}")
private String topic;
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
public void sendDataToKafka(@RequestParam String data) {
ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topic, data);
listenableFuture.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("Sent data {}", result.getProducerRecord().value());
}
@Override
public void onFailure(Throwable ex) {
log.error("Unable to send data {} due to: {}", data, ex.getMessage());
}
});
}
}
И вот тестовый класс в процессе выполнения:
@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyKafkaProducerTest {
private static final String TOPIC = "device";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private MyKafkaProducer producer;
BlockingQueue<ConsumerRecord<String, String>> records;
KafkaMessageListenerContainer<String, String> container;
@BeforeAll
void setUp() {
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties(TOPIC);
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) records::add);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterAll
void tearDown() {
container.stop();
}
@Test
public void testIfWorks() throws InterruptedException {
// Arrange
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
// Act
producer.send(new ProducerRecord<>(TOPIC, "my-aggregate-id", "{"event":"Test Event"}"));
producer.flush();
// Assert
ConsumerRecord<String, String> singleRecord = records.poll(100, TimeUnit.MILLISECONDS);
assertThat(singleRecord).isNotNull();
assertThat(singleRecord.key()).isEqualTo("my-aggregate-id");
assertThat(singleRecord.value()).isEqualTo("{"event":"Test Event"}");
}
Проблема в том, что тест создает производителя по умолчанию:
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
Как я могу использовать свой собственный producer, MyKafkaProducer
и вызвать его sendDataToKafka
метод? Как и что мы можем протестировать в этом случае?
Исходный код можно найти здесь. Здесь находится ветка с незавершенным тестированием. Спасибо.
Комментарии:
1. Вам нужно показать, как вы настраиваете
KafkaTemplate
и его фабрику-производителя в вашем producer. Суть в том, что вы должны принудить его использовать встроенные адреса брокера kafka, но мы не можем помочь с этим, если вы не покажете, как вы его настраиваете.2. Привет, Гэри, спасибо за ответ. Класс
MyKafkaProducer
такой, как он опубликован выше, не более того.3. Верно, но как насчет
KafkaTemplate
того, что подключено к нему; где это настроено?4. Я только что внес последние изменения в репозиторий GitHub и обновил сообщение с его ссылками.
5. @GaryRussell, что касается
KafkaTemplate
, нет, он просто вводится в класс producer с@Autowired
аннотацией. Или я чего-то не понимаю?
Ответ №1:
Итак, это приложение Spring Boot, и вы используете автоматическую настройку KafkaTemplate
.
Чтобы переопределить bootstrap-servers
использование встроенного брокера kafka, см. https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#kafka-testing-embeddedkafka-annotation
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers")
Затем вы можете вызвать своего производителя из тестового примера.
Комментарии:
1. Я добавил вышеупомянутую аннотацию, а также объявил
private MyKafkaProducer myKafkaProducer;
аннотированный с помощью@Autowired
. Затем создал простой тестовый метод, который проверяет это:assertTrue(myKafkaProducer != null);
. Это не удалось сCaused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.codeaches.kafka.basics.MyKafkaProducer' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true)}
2. Аннотация в следующем виде:
@EmbeddedKafka(topics = "device", bootstrapServersProperty = "spring.kafka.producer.bootstrap-servers")
. Тест завершается неудачей, даже если я удаляю новый метод тестирования и сохраняю автоматическоеprivate MyKafkaProducer myKafkaProducer;
объявление.3. Вам нужно добавить @SpringBootTest в свой тестовый пример. Или добавьте
@Bean
объявление для вашего производителя в тестовом контексте.4. да, большое спасибо, это сработало, и при вызове производителя
myKafkaProducer.sendDataToKafka("Hello D-1")
это отобразилосьSent data Hello D-1
в журнале консоли, ура!5. Вы загружаете из темы
device
—consumer: partitions assigned: [device-0, device-1]
, но похоже, что вы публикуете в теме с именемMy-Test-Topic
—Auto creation of topic My-Test-Topic with 2 partitions and replication factor 1 is successful
.