#unit-testing #kotlin #spring-cloud-stream
#модульное тестирование #kotlin #spring-cloud-stream
Вопрос:
У меня есть этот простой потоковый процессор (не потребитель / производитель), использующий kafka streams binder.
@Bean
fun processFoo():Function<KStream<FooName, FooAddress>, KStream<FooName, FooAddressPlus>> {
return Function { input-> input.map { key, value ->
println("nPAYLOAD KEY: ${key.name}n");
println("nPAYLOAD value: ${value.address}n");
val output = FooAddressPlus()
output.address = value.address
output.name = value.name
output.plus = "$value.name-$value.address"
KeyValue(key, output)
}}
}
Я пытаюсь протестировать его с помощью TopologyTestDriver:
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.NONE,
classes = [Application::class, FooProcessor::class]
)
class FooProcessorTests {
var testDriver: TopologyTestDriver? = null
val INPUT_TOPIC = "input"
val OUTPUT_TOPIC = "output"
val inputKeySerde: Serde<FooName> = JsonSerde<FooName>()
val inputValueSerde: Serde<FooAddress> = JsonSerde<FooAddress>()
val outputKeySerde: Serde<FooName> = JsonSerde<FooName>()
val outputValueSerde: Serde<FooAddressPlus> = JsonSerde<FooAddressPlus>()
fun getStreamsConfiguration(): Properties? {
val streamsConfiguration = Properties()
streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "TopologyTestDriver"
streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "dummy:1234"
streamsConfiguration[JsonDeserializer.TRUSTED_PACKAGES] = "*"
streamsConfiguration["spring.kafka.consumer.properties.spring.json.trusted.packages"] = "*"
return streamsConfiguration
}
@Before
fun setup() {
val builder = StreamsBuilder()
val input: KStream<FooName, FooAddress> = builder.stream(INPUT_TOPIC, Consumed.with(inputKeySerde, inputValueSerde))
val processor = FooProcessor()
val output: KStream<FooName, FooAddressPlus> = processor.processFoo().apply(input)
output.to(OUTPUT_TOPIC, Produced.with(outputKeySerde, outputValueSerde))
testDriver = TopologyTestDriver(builder.build(), getStreamsConfiguration())
}
@After
fun tearDown() {
try {
testDriver!!.close()
} catch (e: RuntimeException) {
// https://issues.apache.org/jira/browse/KAFKA-6647 causes exception when executed in Windows, ignoring it
// Logged stacktrace cannot be avoided
println("Ignoring exception, test failing in Windows due this exception:" e.localizedMessage)
}
}
@org.junit.Test
fun testOne() {
val inputTopic: TestInputTopic<FooName, FooAddress> =
testDriver!!.createInputTopic(INPUT_TOPIC, inputKeySerde.serializer(), inputValueSerde.serializer())
val key = FooName()
key.name = "sherlock"
val value = FooAddress()
value.name = "sherlock"
value.address = "Baker street"
inputTopic.pipeInput(key, value)
val outputTopic: TestOutputTopic<FooName, FooAddressPlus> =
testDriver!!.createOutputTopic(OUTPUT_TOPIC, outputKeySerde.deserializer(), outputValueSerde.deserializer())
val message = outputTopic.readValue()
assertThat(message.name).isEqualTo(key.name)
assertThat(message.address).isEqualTo(value.address)
}
}
При его запуске я получаю эту ошибку в строке inputTopic.pipeInput(key, value)
«.FooAddress» отсутствует в доверенных пакетах: [java.util, java.lang]. Если вы считаете, что этот класс безопасен для десериализации, пожалуйста, укажите его имя. Если сериализация выполняется только надежным источником, вы также можете включить trust all () .*
Есть идеи о том, как это решить? Установка этих свойств getStreamsConfiguration()
не помогает. Пожалуйста, обратите внимание, что это потоковый процессор, а не потребитель / производитель.
Большое спасибо!
Комментарии:
1. Большое спасибо @GaryRussell. Я проверю из моего другого вопроса. Кстати, спасибо за потрясающую платформу Spring cloud stream
2. К сожалению, я не смог заставить его работать, возможно, я делаю что-то не так, поэтому я был бы очень признателен за ваше руководство @GaryRussell В моем
getStreamsConfiguration
методе, который я добавил:streamsConfiguration["spring.cloud.streams.kafka.streams.binder.configuration.spring.json.trusted.packages"] = "*"
но все же тест не прошел из-заis not in the trusted packages: [java.util, java.lang]
. Спасибо!3. Я разберусь, почему он не работает с тестовым драйвером — он ничего не знает о Spring, чтобы добавление свойства spring там не помогло.
4. Большое спасибо @GaryRussell. Я могу предоставить репозиторий git с кодом, если это поможет.
Ответ №1:
Когда Kafka создает сам Serde, он применяет свойства путем вызова configure()
.
Поскольку вы сами создаете экземпляр Serde, вам нужно вызвать configure()
его, передав карту свойств.
Именно так свойство trusted packages передается в десериализатор.
Или вы можете вызвать setTrustedPackages()
десериализатор.
Комментарии:
1. БОЖЕ, это работает! Не могу сказать, насколько я благодарен. Большое спасибо.
2. Просто из любопытства: как мне вызвать
setTrustedPackages()
десериализатор? У меня нет к нему доступа, не так ли? У меня есть только те serdes, которые я создаю. Большое спасибо!3. Есть конструктор, в котором вы можете предоставить предварительно настроенные (де) сериализаторы
public JsonSerde(JsonSerializer<T> jsonSerializer, JsonDeserializer<T> jsonDeserializer)
. Или вы можете преобразовать значение, возвращаемое с помощьюdeserializer()
, в aJsonDeserializer<?>
.4. Большое спасибо за советы @GaryRussell, выполнил процедуру с ответами.
Ответ №2:
Итак, для полноты картины, вот как выглядит код при настройке serde, как предлагает @GaryRussell:
private fun getStreamsConfiguration(): Properties? {
// Don't set the trusted packages here since topology test driver does not know about Spring
val streamsConfiguration = Properties()
streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "TopologyTestDriver"
streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "dummy:1234"
}
@Before
fun setup() {
val builder = StreamsBuilder()
// Set the trusted packages for all serdes
val config = mapOf<String, String>(JsonDeserializer.TRUSTED_PACKAGES to "*")
inputKeySerde.configure(config, true)
inputValueSerde.configure(config, false)
outputKeySerde.configure(config, true)
outputValueSerde.configure(config, false)
}
А остальная часть кода остается такой, как описано в вопросе. Вся заслуга @GaryRusell.