Модульное тестирование RX-последовательности, в которой один из потоков задерживается, а другой может включать медленную работу

#c# #unit-testing #system.reactive

#c# #модульное тестирование #system.reactive

Вопрос:

Представьте простой стек обработки, в котором мы хотим гарантировать, что каждое входящее сообщение должно обрабатываться в рамках определенного SLA. Если это не так, мы хотим отправить отправителю ответ об ошибке. Должен быть отправлен только один ответ — успех или неудача, в зависимости от того, что наступит раньше. Моделирование этого кажется простым.

 public class RxTests : ReactiveTest
{
    [Test]
    public void TestSlowProcessing()
    {
        var scheduler = new TestScheduler();
        var obs = scheduler.CreateHotObservable(OnNext(100, 1), OnNext(200, 2));
        var processed = obs
        // this is the main processing which may in some cases take a long time
/*
        This code works as expected
            .SelectMany(
        val => val % 2 == 0 ? Observable.Never<int>() : Observable.Return(val)); 
*/
        // this fails under test I guess because Sleep does not "yield" to the delayed observable and the main path processing ticks first
        .Select(val => 
        {
            if (val % 2 == 0) 
            {
                // how can I waste time here and let delayed observable tick?
                scheduler.Sleep(10000); 
            }
            return val;
        });
        var delayed = obs.Delay(TimeSpan.FromTicks(1000), scheduler);
        var merged = processed.Merge(delayed).Distinct(_ => _);
        var observer = scheduler.Start(() => merged, 0, 1, 1000000);
        // We expect the second output message to come from the delayed feed.
        Assert.IsTrue(observer.Messages[1].Time == 1200);
    }
}
  

В реальном коде используется Select, поскольку нам просто нужно преобразовать входящее сообщение, используя некоторые потенциально медленные вычисления. Но я не могу найти способ эмулировать это замедление в модульном тестировании. Ждать, await и т.д. все приведет к взаимоблокировке. Есть идеи?