Тестирование потока Spring cloud с помощью kafka stream binder: используя TopologyTestDriver, я получаю сообщение об ошибке «Класс отсутствует в доверенных пакетах»

#unit-testing #kotlin #spring-cloud-stream

#модульное тестирование #kotlin #spring-cloud-stream

Вопрос:

У меня есть этот простой потоковый процессор (не потребитель / производитель), использующий kafka streams binder.

 @Bean
fun processFoo():Function<KStream<FooName, FooAddress>, KStream<FooName, FooAddressPlus>> {
    return Function { input-> input.map { key, value ->
        println("nPAYLOAD KEY: ${key.name}n");
        println("nPAYLOAD value: ${value.address}n");
        val output = FooAddressPlus()
        output.address = value.address
        output.name = value.name
        output.plus = "$value.name-$value.address"
        KeyValue(key, output)
    }}
}
  

Я пытаюсь протестировать его с помощью TopologyTestDriver:

 @SpringBootTest(
        webEnvironment = SpringBootTest.WebEnvironment.NONE,
        classes = [Application::class, FooProcessor::class]
)
class FooProcessorTests {
    var testDriver: TopologyTestDriver? = null
    val INPUT_TOPIC = "input"
    val OUTPUT_TOPIC = "output"

    val inputKeySerde: Serde<FooName> = JsonSerde<FooName>()
    val inputValueSerde: Serde<FooAddress> = JsonSerde<FooAddress>()
    val outputKeySerde: Serde<FooName> = JsonSerde<FooName>()
    val outputValueSerde: Serde<FooAddressPlus> = JsonSerde<FooAddressPlus>()

    fun getStreamsConfiguration(): Properties? {
        val streamsConfiguration = Properties()
        streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "TopologyTestDriver"
        streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "dummy:1234"
        streamsConfiguration[JsonDeserializer.TRUSTED_PACKAGES] = "*"
        streamsConfiguration["spring.kafka.consumer.properties.spring.json.trusted.packages"] = "*"
        return streamsConfiguration
    }

    @Before
    fun setup() {
        val builder = StreamsBuilder()
        val input: KStream<FooName, FooAddress> = builder.stream(INPUT_TOPIC, Consumed.with(inputKeySerde, inputValueSerde))
        val processor = FooProcessor()
        val output: KStream<FooName, FooAddressPlus> = processor.processFoo().apply(input)
        output.to(OUTPUT_TOPIC, Produced.with(outputKeySerde, outputValueSerde))
        testDriver = TopologyTestDriver(builder.build(), getStreamsConfiguration())
    }

    @After
    fun tearDown() {
        try {
            testDriver!!.close()
        } catch (e: RuntimeException) {
            // https://issues.apache.org/jira/browse/KAFKA-6647 causes exception when executed in Windows, ignoring it
            // Logged stacktrace cannot be avoided
            println("Ignoring exception, test failing in Windows due this exception:"   e.localizedMessage)
        }
    }

    @org.junit.Test
    fun testOne() {
        val inputTopic: TestInputTopic<FooName, FooAddress> =
                testDriver!!.createInputTopic(INPUT_TOPIC, inputKeySerde.serializer(), inputValueSerde.serializer())
        val key = FooName()
        key.name = "sherlock"
        val value = FooAddress()
        value.name = "sherlock"
        value.address = "Baker street"
        inputTopic.pipeInput(key, value)
        val outputTopic: TestOutputTopic<FooName, FooAddressPlus> =
                testDriver!!.createOutputTopic(OUTPUT_TOPIC, outputKeySerde.deserializer(), outputValueSerde.deserializer())
        val message = outputTopic.readValue()

        assertThat(message.name).isEqualTo(key.name)
        assertThat(message.address).isEqualTo(value.address)
    }
}
  

При его запуске я получаю эту ошибку в строке inputTopic.pipeInput(key, value)

«.FooAddress» отсутствует в доверенных пакетах: [java.util, java.lang]. Если вы считаете, что этот класс безопасен для десериализации, пожалуйста, укажите его имя. Если сериализация выполняется только надежным источником, вы также можете включить trust all () .*

Есть идеи о том, как это решить? Установка этих свойств getStreamsConfiguration() не помогает. Пожалуйста, обратите внимание, что это потоковый процессор, а не потребитель / производитель.

Большое спасибо!

Комментарии:

1. Большое спасибо @GaryRussell. Я проверю из моего другого вопроса. Кстати, спасибо за потрясающую платформу Spring cloud stream

2. К сожалению, я не смог заставить его работать, возможно, я делаю что-то не так, поэтому я был бы очень признателен за ваше руководство @GaryRussell В моем getStreamsConfiguration методе, который я добавил: streamsConfiguration["spring.cloud.streams.kafka.streams.binder.configuration.spring.json.trusted.packages"] = "*" но все же тест не прошел из-за is not in the trusted packages: [java.util, java.lang] . Спасибо!

3. Я разберусь, почему он не работает с тестовым драйвером — он ничего не знает о Spring, чтобы добавление свойства spring там не помогло.

4. Большое спасибо @GaryRussell. Я могу предоставить репозиторий git с кодом, если это поможет.

Ответ №1:

Когда Kafka создает сам Serde, он применяет свойства путем вызова configure() .

Поскольку вы сами создаете экземпляр Serde, вам нужно вызвать configure() его, передав карту свойств.

Именно так свойство trusted packages передается в десериализатор.

Или вы можете вызвать setTrustedPackages() десериализатор.

Комментарии:

1. БОЖЕ, это работает! Не могу сказать, насколько я благодарен. Большое спасибо.

2. Просто из любопытства: как мне вызвать setTrustedPackages() десериализатор? У меня нет к нему доступа, не так ли? У меня есть только те serdes, которые я создаю. Большое спасибо!

3. Есть конструктор, в котором вы можете предоставить предварительно настроенные (де) сериализаторы public JsonSerde(JsonSerializer<T> jsonSerializer, JsonDeserializer<T> jsonDeserializer) . Или вы можете преобразовать значение, возвращаемое с помощью deserializer() , в a JsonDeserializer<?> .

4. Большое спасибо за советы @GaryRussell, выполнил процедуру с ответами.

Ответ №2:

Итак, для полноты картины, вот как выглядит код при настройке serde, как предлагает @GaryRussell:

 private fun getStreamsConfiguration(): Properties? {
    // Don't set the trusted packages here since topology test driver does not know about Spring
    val streamsConfiguration = Properties()
    streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "TopologyTestDriver"
    streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "dummy:1234"
}
  
 @Before
fun setup() {
    val builder = StreamsBuilder()
    // Set the trusted packages for all serdes
    val config = mapOf<String, String>(JsonDeserializer.TRUSTED_PACKAGES to "*")
    inputKeySerde.configure(config, true)
    inputValueSerde.configure(config, false)
    outputKeySerde.configure(config, true)
    outputValueSerde.configure(config, false)
}
  

А остальная часть кода остается такой, как описано в вопросе. Вся заслуга @GaryRusell.