#kotlin-coroutines
#kotlin-сопрограммы
Вопрос:
Я пытаюсь эмулировать BehaviourSubject
использование Flow
.
Код в следующем примере работает некорректно, поскольку collect {}
не позволяет продолжить сопрограмму:
@FlowPreview
@ExperimentalCoroutinesApi
fun main() = runBlocking {
val scope = CoroutineScope(Dispatchers.IO)
val channel = MutableStateFlow(0)
channel.value
val flow = channel.broadcastIn(scope).asFlow()
channel.value
flow.collect { println("First: $it") } // Code will stop executing here ...
flow.collect { println("Second: $it") } // Will not execute
channel.value // Wil not execute
return@runBlocking
}
Вывод:
First: 2
// process not exited
Я не думаю, что обратный вызов также должен удерживать поток.
В чем проблема?
Такая проблема не возникает с холодным Flow
:
fun main() = runBlocking {
val data = listOf(1, 2, 3, 4, 5).asFlow()
data.collect { println(it) }
println("Done")
}
Вывод:
1
2
3
4
5
Done
Я что-то упускаю?
Kotlin:
1.4.10
Сопрограммы:1.3.9
Ответ №1:
В обоих случаях Flow
он ведет себя одинаково, вы пропустили то, что Flow.collect
приостанавливается до завершения потока.
В случае холодного потока он явно заканчивается после того, как будут выданы 5 значений, поэтому он завершает выполнение. Между тем, в горячем потоке завершение потока зависит от кода, который выполняется ПОСЛЕ collect
возврата, классический тупик.
Если вы выполняете collect
асинхронно в launch
блоке, он будет вести себя одинаково в обоих случаях. Но поскольку a StateFlow
не может «завершиться» по замыслу, в конечном итоге он будет вечно ждать завершения, которое не может произойти.
Комментарии:
1. Так безопасно ли его использовать
StateFlow
, поскольку он просто удерживает поток? Если я создам 10 потоков состояний, я на самом деле потрачу впустую 10 сопрограмм? Я даже правильно эмулирую тему Rx? Я сомневаюсь в этом.2. Он не удерживает поток, но поскольку основная сопрограмма ожидает завершения потока, а поток ожидает значения от основной сопрограммы, нет никакой работы, которую можно было бы выполнить. Что касается
Subject
его, он не может быть реализован напрямую с использованиемStateFlow
как потому, что он не может быть закрыт, так и потому, что у него есть несколько гарантий пропуска значений.