Можно ли контролировать время обработки входных данных с помощью scio JobTest?

#spotify-scio

#spotify-scio

Вопрос:

Мы используем com.spotify.scio.testing.JobTest для сквозного тестирования нашего конвейера scio. Конвейер включает в себя DoFn, чувствительный к последовательности данных, в потоке конфигурационных данных, которые поступают нечасто.

Мы передаем упорядоченный список значений конфигурации combinedSampleConfig в качестве входных данных в JobTest Builder. Есть ли способ заставить JobTest сохранить порядок этого пользовательского входного потока при выполнении сквозного теста?

Я вижу, что платформа тестирования позволяет точно контролировать время прибытия источника (использования advanceProcessingTime ) при тестировании отдельных компонентов, но не вижу, как применить это для сквозного тестирования с помощью JobTest.

     JobTest[MyApp.type]
      .args(commonArgs    Seq(
        "--numWorkers=1",
        "--maxNumWorkers=1",
      ): _*
      )
      .input(CustomIO[PubsubMessage](CONFIG_ID), combinedSampleConfig)
      .input(CustomIO[IndicatorEntry](INPUT_ID), sampleInput)
      .output(CustomIO[EnrichedIndicatorEntry](AGG_ID)) {
        _ should containInAnyOrder (expectedAggs)
      }
      .output(CustomIO[EnrichedIndicatorEntry](EVENT_ID)) {
        _ should containInAnyOrder (expectedEvents)
      }
      .run()
 

Комментарии:

1. Сейчас такого способа нет, но, вероятно, он выполним, если разрешить Beam TestStream (который поддерживает синхронизацию событий) в качестве входных данных. Я подал github.com/spotify/scio/issues/1891 .

Ответ №1:

https://github.com/spotify/scio/pull/1905

Этот PR был недавно объединен и должен допускать такой вариант использования. Можете ли вы попробовать?