Отсутствует встроенный кластер kafkacluster?

#apache-kafka #integration-testing #apache-kafka-streams

#apache-kafka #интеграция-тестирование #apache-kafka-streams

Вопрос:

Эта запись в блоге https://www.confluent.io/blog/stream-processing-part-2-testing-your-streaming-application / ссылается на класс EmbeddedKafkaCluster, который должен находиться в библиотеке kafka-streams-test-utils.

Однако этот класс отсутствует в библиотеке, например org.apache.кафка / kafka-streams-test-utils/2.5.1.

Я подумал, что могу использовать исходный код с github https://github.com/a0x8o/kafka/blob/master/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java

Но этот исходный код ссылается на некоторые классы, например, kafka.zk.EmbeddedZookeeper и kafka.utils.MockTime, который, как я предположил, должен быть в библиотеке, подобной org.apache.kafka / kafka_2.13/2.5.1. К сожалению, они также отсутствуют.

Каков наилучший способ настроить проект для использования EmbeddedKafkaCluster в этом случае?

Спасибо

Борис

Комментарии:

1. Этот репозиторий не является фактическим исходным кодом Kafka. Кроме того, используете ли вы Maven? Если это так, существуют классификаторы, которые добавляют тестовые классы

2. Действительно, я пробовал тестовые классификаторы — но это не помогло

3. Я знаю, что была JIRA об упрощении интеграции встроенных серверов. В противном случае широко используется spring-kafka. Или вы можете использовать docker? testcontainers.org/modules/kafka

4. Часть нашей разработки происходит в Windows, где docker не установлен (не может быть установлен). На данный момент мы рассмотрим github.com/salesforce/kafka-junit . Мы также будем использовать часть исходного кода, доступного для EmbeddedSingleNodeCluster, для интеграции реестра схемы.

5. Там это medium.com/bakdata/… Хотя, мне интересно, будут ли макетные объекты работать нормально для вас, а не для фактического запуска серверов

Ответ №1:

Добавьте следующие зависимости:

     //build.gradle

    testCompile group: 'junit', name: 'junit', version: '4.13'
    testCompile group: 'org.hamcrest', name: 'hamcrest-junit', version: '2.0.0.0'

    compile group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.7.0'
    testCompile group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.7.0', classifier:'test'
    compile group: 'org.apache.kafka', name: 'kafka-streams', version: '2.7.0'
    testCompile group: 'org.apache.kafka', name: 'kafka-streams', version: '2.7.0', classifier: 'test'
    compile group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version:  '2.7.0'
    testCompile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.7.0', classifier: 'test'
  

Если вы используете Maven, преобразуйте все зависимости на основе этого кода:

 //pom.xml
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.7.0</version>
    <scope>test</scope>
    <classifier>test</classifier>
</dependency>
  

И создайте встроенный кластер kafkacluster следующим образом (пример Kotlin):

     @Test
    fun createEmbeddedKafkaClusterTest() {

        val NUM_BROKERS = 1
        val embeddedKafkaCluster = EmbeddedKafkaCluster(NUM_BROKERS)
        Assert.assertNotNull(embeddedKafkaCluster)

        embeddedKafkaCluster.start()

        embeddedKafkaCluster.createTopic("TestTopic")
    }