Как реализовать оператор ScanAsync с асинхронным аккумулятором в Rx.Net ?

#c# #asynchronous #system.reactive #state-machine #rx.net

#c# #асинхронный #system.reactive #конечный автомат #rx.net

Вопрос:

Scan Оператор в Rx.Net имеет подпись:

 public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator);
  

Аккумулятор является

 Func<TAccumulate, TSource, TAccumulate> accumulator
  

При попытке реализовать модель конечного автомата с асинхронным переходом состояния я обнаружил, что ScanAsync оператор со следующей подписью был бы полезен.

 public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator);
  

Аккумулятор имеет подпись

 Func<TAccumulate, TSource, Task<TAccumulate>> accumulator
  

Идеальным кодом приложения было бы что-то вроде этого (аналогично обычному Scan оператору, с разницей в использовании асинхронного аккумулятора).

 IObservable<TEvent> events;
IObservable<State> states = events.ScanAsync(
    initialState, 
    async (previousState, evt) => {
        var newState = await transitionAsync(previousState, evt);
        return newState;
    });
  

Кажется, MS развиваетсяAsyncRx.NET , однако он еще не выпущен (без расписания).


Связанные вещи:

Если моделировать асинхронный конечный автомат с помощью BehaviourSubject для состояний и подписаться на наблюдаемые события, как в следующем коде

 IObservable<TEvent> events;
BehaviourSubject<State> states = new BehaviourSubject<State>(initialState);
events.Subscribe(async e => {
    var newState = await transition(states.Value, e);
    states.OnNext(newState);
})
  

Я предполагаю, что в некоторых случаях могут быть условия гонки.

Я попытался реализовать это с

 IObservable<TS> ScanAsync<TS, TE>(
IObservable<TE> source,
Func<TS, TE, Task<TS>> reducer,
TS initialState)
{
    var states = from m in source.Take(1)
                    from nextState in reducer(initialState, m).ToObservable()
                    from s in ScanAsync(source.Skip(1), reducer, nextState)
                    select s;
    return Observable.Return(initialState).Concat(states);
}
  

Однако иногда это работает, иногда это просто блокируется, и я понятия не имею, что является причиной этого.

Ответ №1:

Вы могли бы использовать Scan оператор для создания промежуточного IObservable<Task<TAccumulate>> , который затем может быть сглажен с помощью Concat оператора:

 public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(
    this IObservable<TSource> source,
    TAccumulate seed,
    Func<TAccumulate, TSource, Task<TAccumulate>> accumulator)
{
    return source.Scan(Task.FromResult(seed), async (previousTask, item) =>
    {
        return await accumulator(await previousTask, item);
    }).Concat();
}
  

В приведенной выше реализации используется Concat перегрузка, которая принимает наблюдаемый набор задач вместо вложенного наблюдаемого:

 // Concatenates all task results, as long as the previous task terminated successfully.
public static IObservable<TSource> Concat<TSource>(
    this IObservable<Task<TSource>> sources);