#apache-kafka #integration-testing #apache-kafka-streams
#apache-kafka #интеграция-тестирование #apache-kafka-streams
Вопрос:
Эта запись в блоге https://www.confluent.io/blog/stream-processing-part-2-testing-your-streaming-application / ссылается на класс EmbeddedKafkaCluster, который должен находиться в библиотеке kafka-streams-test-utils.
Однако этот класс отсутствует в библиотеке, например org.apache.кафка / kafka-streams-test-utils/2.5.1.
Я подумал, что могу использовать исходный код с github https://github.com/a0x8o/kafka/blob/master/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
Но этот исходный код ссылается на некоторые классы, например, kafka.zk.EmbeddedZookeeper и kafka.utils.MockTime, который, как я предположил, должен быть в библиотеке, подобной org.apache.kafka / kafka_2.13/2.5.1. К сожалению, они также отсутствуют.
Каков наилучший способ настроить проект для использования EmbeddedKafkaCluster в этом случае?
Спасибо
Борис
Комментарии:
1. Этот репозиторий не является фактическим исходным кодом Kafka. Кроме того, используете ли вы Maven? Если это так, существуют классификаторы, которые добавляют тестовые классы
2. Действительно, я пробовал тестовые классификаторы — но это не помогло
3. Я знаю, что была JIRA об упрощении интеграции встроенных серверов. В противном случае широко используется spring-kafka. Или вы можете использовать docker? testcontainers.org/modules/kafka
4. Часть нашей разработки происходит в Windows, где docker не установлен (не может быть установлен). На данный момент мы рассмотрим github.com/salesforce/kafka-junit . Мы также будем использовать часть исходного кода, доступного для EmbeddedSingleNodeCluster, для интеграции реестра схемы.
5. Там это medium.com/bakdata/… Хотя, мне интересно, будут ли макетные объекты работать нормально для вас, а не для фактического запуска серверов
Ответ №1:
Добавьте следующие зависимости:
//build.gradle
testCompile group: 'junit', name: 'junit', version: '4.13'
testCompile group: 'org.hamcrest', name: 'hamcrest-junit', version: '2.0.0.0'
compile group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.7.0'
testCompile group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.7.0', classifier:'test'
compile group: 'org.apache.kafka', name: 'kafka-streams', version: '2.7.0'
testCompile group: 'org.apache.kafka', name: 'kafka-streams', version: '2.7.0', classifier: 'test'
compile group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: '2.7.0'
testCompile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.7.0', classifier: 'test'
Если вы используете Maven, преобразуйте все зависимости на основе этого кода:
//pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.7.0</version>
<scope>test</scope>
<classifier>test</classifier>
</dependency>
И создайте встроенный кластер kafkacluster следующим образом (пример Kotlin):
@Test
fun createEmbeddedKafkaClusterTest() {
val NUM_BROKERS = 1
val embeddedKafkaCluster = EmbeddedKafkaCluster(NUM_BROKERS)
Assert.assertNotNull(embeddedKafkaCluster)
embeddedKafkaCluster.start()
embeddedKafkaCluster.createTopic("TestTopic")
}