#spotify-scio
#spotify-scio
Вопрос:
Мы используем com.spotify.scio.testing.JobTest для сквозного тестирования нашего конвейера scio. Конвейер включает в себя DoFn, чувствительный к последовательности данных, в потоке конфигурационных данных, которые поступают нечасто.
Мы передаем упорядоченный список значений конфигурации combinedSampleConfig
в качестве входных данных в JobTest Builder. Есть ли способ заставить JobTest сохранить порядок этого пользовательского входного потока при выполнении сквозного теста?
Я вижу, что платформа тестирования позволяет точно контролировать время прибытия источника (использования advanceProcessingTime
) при тестировании отдельных компонентов, но не вижу, как применить это для сквозного тестирования с помощью JobTest.
JobTest[MyApp.type]
.args(commonArgs Seq(
"--numWorkers=1",
"--maxNumWorkers=1",
): _*
)
.input(CustomIO[PubsubMessage](CONFIG_ID), combinedSampleConfig)
.input(CustomIO[IndicatorEntry](INPUT_ID), sampleInput)
.output(CustomIO[EnrichedIndicatorEntry](AGG_ID)) {
_ should containInAnyOrder (expectedAggs)
}
.output(CustomIO[EnrichedIndicatorEntry](EVENT_ID)) {
_ should containInAnyOrder (expectedEvents)
}
.run()
Комментарии:
1. Сейчас такого способа нет, но, вероятно, он выполним, если разрешить Beam
TestStream
(который поддерживает синхронизацию событий) в качестве входных данных. Я подал github.com/spotify/scio/issues/1891 .
Ответ №1:
https://github.com/spotify/scio/pull/1905
Этот PR был недавно объединен и должен допускать такой вариант использования. Можете ли вы попробовать?