Kotlin flatMapMerge непоследователен при многократном объединении одного и того же потока

#kotlin #concurrency #kotlin-flow #kotlin-coroutines

Вопрос:

У меня есть приложение, которое flatMapMerge использует общий поток A для создания потока B. Затем я объединяю поток A с потоком B, чтобы получить конечный результат, который я collectLatest . Иногда он выводит все результаты. Однако в других случаях он пропускает часть выходных данных. Вот упрощенный пример:

 val flowA = MutableSharedFlow<Int>(replay = Int.MAX_VALUE)
val flowB = flowA.flatMapMerge {
    // In the real app, this flow is created in a complex way
    flow {
        emit(it   2)
        emit(it   3)
        emit(it   4)
        emit(it   5)
        emit(it   6)
        emit(it   7)
        emit(it   8)
        emit(it   9)
    }
}
flowA.emit(1)
flowA.combine(flowB, ::Pair).collect { (a, b) ->
    // I need access to the value of flowA for calculations here
    Log.d(TAG, "result A: $a, B: $b")
}
 

1-й прогон:

 result A: 1, B: 3
result A: 1, B: 5
result A: 1, B: 7
result A: 1, B: 9
result A: 1, B: 10
 

2-й прогон:

 result A: 1, B: 3
result A: 1, B: 5
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 10
 

Ожидаемый результат:

 result A: 1, B: 3
result A: 1, B: 4
result A: 1, B: 5
result A: 1, B: 6
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 9
result A: 1, B: 10
 

Я также попытался настроить повтор, но это не возымело никакого эффекта. Я тоже попробовал collectLatest вместо collect , но тоже без изменений.

Должен ли я использовать что-то другое, кроме flatMapMerge ? Я пытался flatMapLatest , но это приводит только к первому и последнему результату. Должно MutableSharedFlow ли это быть что-то другое?

Как мне каждый раз выводить все результаты?

Ответ №1:

Проблема здесь в функции объединения

согласно официальной документации, функция объединения:

Возвращает поток, значения которого генерируются с помощью функции преобразования путем объединения последних значений, выданных каждым потоком.

Если функция преобразования combine() имеет внутри себя некоторые сложные вычисления, возможно, что функция combine() выдает не все результаты.

Таким образом, для получения всех результатов приведенная ниже стратегия отлично работает.

 val flowA = MutableSharedFlow<Int>()
        .onStart { emit(1) }

    val flowB = flowA.flatMapMerge {
        flow {
            for (i in 2..9) {
                emit(Pair(it, i))
            }
        }
    }

    runBlocking {
        flowB.collect { (a, b) ->
            println("result A: $a, B: $b")

        }
    }
 

Выход:

 result A: 1, B: 3
result A: 1, B: 4
result A: 1, B: 5
result A: 1, B: 6
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 9
result A: 1, B: 10