#kotlin #kotlin-coroutines #kotlin-flow
#kotlin #kotlin-сопрограммы #kotlin-flow
Вопрос:
Я пытаюсь запустить сопрограмму, пока SharedFlow активен ( subscriptionCount
больше нуля) и отменяется при уменьшении количества. Но почему-то даже что-то столь простое, как distinctUntilChanged()
это, работает не так, как должно, и я сбит с толку.
Для этого я создаю «onActive» расширение, подобное этому:
fun <T : Any> MutableSharedFlow<T>.onActive(
block: suspend CoroutineScope.() -> Unit
): Flow<T> {
val original = this
val isActiveFlow: Flow<Boolean> = subscriptionCount
.map {
println("Class: Count is $it")
it > 0
}
.distinctUntilChanged()
return isActiveFlow.flatMapLatest { isActive ->
println("Class: isActive is $isActive")
// here would be the code that calls `block`
// but just this exactly as is, already triggers the error
original // still emits the original flow,
// that is needed or else subscriptionCount never changes
}
}
Это изначально кажется, что это работает, но запуск теста на нем, который добавляет несколько подписчиков, приведет к печати «isActive is true» несколько раз подряд. Почему distinctUntilChanged()
не работает? Этот повторный вызов приводит к путанице с остальной логикой в отредактированной области.
Тест выглядит следующим образом:
@Test
fun `onActive is called only once with multiple subscribers`() = runBlocking {
val flow = MutableSharedFlow<Int>(
replay = 2,
onBufferOverflow = BufferOverflow.DROP_OLDEST
).apply {
repeat(5) { tryEmit(it) }
}.onActive {
}
val jobs = mutableListOf<Job>()
repeat(3) { count ->
jobs.add(flow.onEach {
println("Test: Listener $count received $it")
}.launchIn(this))
}
delay(100)
jobs.forEach { it.cancel() }
jobs.forEach { it.join() }
}
запустив это, результат будет следующим:
Class: Count is 0
Class: isActive is false
Class: Count is 1
Class: Count is 1
Class: isActive is true
Class: Count is 2
Class: Count is 2
Class: isActive is true
Class: Count is 3
Test: Listener 0 received 3
Test: Listener 0 received 4
Test: Listener 1 received 3
Test: Listener 1 received 4
Test: Listener 2 received 3
Test: Listener 2 received 4
Class: Count is 2
Class: isActive is true
Class: Count is 3
Class: Count is 3
Class: Count is 3
Test: Listener 0 received 3
Test: Listener 0 received 4
Итак, вопрос, почему distinctUntilChanged()
не работает и как я могу это исправить?
Комментарии:
1. Я не вижу ни
distinctUntilChanged
одного илиfoobar
вызова в тесте, ожидается ли это? РЕДАКТИРОВАТЬ: хорошо, вы назвали егоfoobar
в объявлении, но фактическое имя методаonActive
2. На самом деле было бы неплохо поделиться отредактированной частью, потому что это может повлиять на то, как она работает
3. Вы
SharingStarted.WhileSubscribed
, кстати, смотрели? Похоже, то, что вы хотите, можно было бы проще написать, просто используя обычный поток, а затем поделиться им сshareIn(started = SharingStarted.WhileSubscribed)
4. @Джоффри привет. Спасибо, что проверили мою проблему. Да, у меня было несколько версий метода, и в какой-то момент был просто foobar . Я отредактировал сообщение, чтобы все было правильно вызвано
onActive
. ВызовdistinctUntilChanged()
находится в методе, который я пытаюсь написать, а не в тесте. Я полагаю, что отредактированная часть может что-то изменить, НО я фактически удалил эту версию метода, чтобы изолировать проблему. В настоящее времяflatMapLatest
это просто println и возвращает исходный поток.5. привет, @Joffrey, из того, что я вижу
SharingStarted.WhileSubscribed
, используется при вызовеshareIn
, но мне нужно, чтобы ablock: suspend CoroutineScope.() -> Unit
выполнялся, когда значение isActive равно true, и чтобы этот сопрограммный параметр был отменен, когда значение isActive равно false . Я не понимаю, как я мог бы связать это сSharingStarted.WhileSubscribed
Ответ №1:
Кажется, поведение, которое вы видите, на самом деле правильное, насколько distinctUntilChanged
это касается:
- первый зарегистрированный подписчик собирает исходные 2 воспроизведенных элемента с начальным
isActive=false
значением - затем
isActive
становится true из-за этого первого susbcription, так что первый подписчик вспоминает исходный поток из-заflatMapLatest
и, таким образом, снова получает воспроизводимые элементы - остальные 2 подписчика поступают, когда значение
subscriptionCount
уже не равно 0, поэтомуisActive
оно остается для них верным до тех пор, пока они не будут отменены
Если сопрограмма, которую вы запускаете «пока есть подписчики», предназначена для создания элементов в SharedFlow
, я бы предпочел сначала определить поток как channelFlow
/ callbackFlow
, а затем использовать shareIn с SharingStarted.WhileSubscribed
, чтобы это поведение «запускалось при наличии подписчиков».
Если это «просто на стороне», вам, вероятно, нужна внешняя область и просто запустите сопрограмму отдельно, чтобы прослушать sharedFlow.subscribersCount
и запустить / остановить сопрограмму «коляски».
Комментарии:
1. Из вашего объяснения
map/distinctUntilChanged/flatMapLatest
это происходит. для каждой подписки, а не только один раз приonActive
вызове метода. Я уверен, что этому есть хорошее объяснение, но, безусловно, несколько удивительное поведение.2. К вашему сведению, конечной конечной целью здесь было иметь
while(true) { emit(refresh()); delay(duration) }
возможность постоянно обновлять значение, пока есть подписчик. Путь, который я пробовал (и здесь потерпел неудачу), заключался в том, чтобы иметь isActiveFlow для вызова block() с новым сопрограммированием, которое я мог бы отменить, когдаisActive==false
. Вероятно, можно было бы что-то сделать, используя стандартную фабрику потоковflow {
, если бы был способ проверитьhasSubscribers
цикл while. Я вернусь к чертежной доске. Спасибо.3. @Budius что касается вашего первого комментария, это действительно так, как
Flow
работает в целом. Единственная «общая» часть — этоSharedFlow
сама (или все, вплоть доshareIn
оператора, когда вы создаетеSharedFlow
его таким образом). В противном случае все операторы применяются независимо для всех коллекторов.4. @Budius если, «обновляя значение», вы ссылаетесь на значение в общем потоке, то почему бы не создать его как
channelFlow
илиcallbackFlow
с помощью этого встроенного цикла обновления, а затем использоватьshareIn
? Я думал, что побочная сопрограмма не предназначена для отправки значений в поток5. Вероятно, можно было бы что-то сделать, используя стандартный поток Flow factory flow { если бы был способ проверить hasSubscribers в цикле while — это именно то, что
shareIn(started = SharingStarted.WhileSubscribed)
нужно было бы сделать. Вы не беспокоитесь о проверкеhasSubscribers
вручную, но начальноеflow { }
содержимое отменяется и перезапускается, когда оно падает до 0