Сопрограммы Kotlin: Горячий поток удерживает поток

#kotlin-coroutines

#kotlin-сопрограммы

Вопрос:

Я пытаюсь эмулировать BehaviourSubject использование Flow .

Код в следующем примере работает некорректно, поскольку collect {} не позволяет продолжить сопрограмму:

 @FlowPreview
@ExperimentalCoroutinesApi
fun main() = runBlocking {
    val scope = CoroutineScope(Dispatchers.IO)
    val channel = MutableStateFlow(0)
    channel.value  
    val flow = channel.broadcastIn(scope).asFlow()
    channel.value  
    flow.collect { println("First: $it") } // Code will stop executing here ...

    flow.collect { println("Second: $it") } // Will not execute
    channel.value    // Wil not execute

    return@runBlocking

}
  

Вывод:

 First: 2
// process not exited
  

Я не думаю, что обратный вызов также должен удерживать поток.
В чем проблема?

Такая проблема не возникает с холодным Flow :

 fun main() = runBlocking {
    val data = listOf(1, 2, 3, 4, 5).asFlow()
    data.collect { println(it) }
    println("Done")
}
  

Вывод:

 1
2
3
4
5
Done
  

Я что-то упускаю?

Kotlin: 1.4.10
Сопрограммы: 1.3.9

Ответ №1:

В обоих случаях Flow он ведет себя одинаково, вы пропустили то, что Flow.collect приостанавливается до завершения потока.

В случае холодного потока он явно заканчивается после того, как будут выданы 5 значений, поэтому он завершает выполнение. Между тем, в горячем потоке завершение потока зависит от кода, который выполняется ПОСЛЕ collect возврата, классический тупик.

Если вы выполняете collect асинхронно в launch блоке, он будет вести себя одинаково в обоих случаях. Но поскольку a StateFlow не может «завершиться» по замыслу, в конечном итоге он будет вечно ждать завершения, которое не может произойти.

Комментарии:

1. Так безопасно ли его использовать StateFlow , поскольку он просто удерживает поток? Если я создам 10 потоков состояний, я на самом деле потрачу впустую 10 сопрограмм? Я даже правильно эмулирую тему Rx? Я сомневаюсь в этом.

2. Он не удерживает поток, но поскольку основная сопрограмма ожидает завершения потока, а поток ожидает значения от основной сопрограммы, нет никакой работы, которую можно было бы выполнить. Что касается Subject его, он не может быть реализован напрямую с использованием StateFlow как потому, что он не может быть закрыт, так и потому, что у него есть несколько гарантий пропуска значений.