Поток.distinctUntilChanged() не работает

#kotlin #kotlin-coroutines #kotlin-flow

#kotlin #kotlin-сопрограммы #kotlin-flow

Вопрос:

Я пытаюсь запустить сопрограмму, пока SharedFlow активен ( subscriptionCount больше нуля) и отменяется при уменьшении количества. Но почему-то даже что-то столь простое, как distinctUntilChanged() это, работает не так, как должно, и я сбит с толку.

Для этого я создаю «onActive» расширение, подобное этому:

 fun <T : Any> MutableSharedFlow<T>.onActive(
    block: suspend CoroutineScope.() -> Unit
): Flow<T> {

    val original = this

    val isActiveFlow: Flow<Boolean> = subscriptionCount
        .map {
            println("Class: Count is $it")
            it > 0
        }
        .distinctUntilChanged()

    return isActiveFlow.flatMapLatest { isActive ->
        println("Class: isActive is $isActive")
        // here would be the code that calls `block`
        // but just this exactly as is, already triggers the error

        original // still emits the original flow, 
                 // that is needed or else subscriptionCount never changes
    }
}
 

Это изначально кажется, что это работает, но запуск теста на нем, который добавляет несколько подписчиков, приведет к печати «isActive is true» несколько раз подряд. Почему distinctUntilChanged() не работает? Этот повторный вызов приводит к путанице с остальной логикой в отредактированной области.

Тест выглядит следующим образом:

     @Test
    fun `onActive is called only once with multiple subscribers`() = runBlocking {

        val flow = MutableSharedFlow<Int>(
            replay = 2,
            onBufferOverflow = BufferOverflow.DROP_OLDEST
        ).apply {
            repeat(5) { tryEmit(it) }
        }.onActive {

        }

        val jobs = mutableListOf<Job>()
        repeat(3) { count ->
            jobs.add(flow.onEach {
                println("Test:  Listener $count received $it")
            }.launchIn(this))
        }
        delay(100)
        jobs.forEach { it.cancel() }
        jobs.forEach { it.join() }
    }
 

запустив это, результат будет следующим:

 Class: Count is 0
Class: isActive is false
Class: Count is 1
Class: Count is 1
Class: isActive is true
Class: Count is 2
Class: Count is 2
Class: isActive is true
Class: Count is 3
Test:  Listener 0 received 3
Test:  Listener 0 received 4
Test:  Listener 1 received 3
Test:  Listener 1 received 4
Test:  Listener 2 received 3
Test:  Listener 2 received 4
Class: Count is 2
Class: isActive is true
Class: Count is 3
Class: Count is 3
Class: Count is 3
Test:  Listener 0 received 3
Test:  Listener 0 received 4

 

Итак, вопрос, почему distinctUntilChanged() не работает и как я могу это исправить?

Комментарии:

1. Я не вижу ни distinctUntilChanged одного или foobar вызова в тесте, ожидается ли это? РЕДАКТИРОВАТЬ: хорошо, вы назвали его foobar в объявлении, но фактическое имя метода onActive

2. На самом деле было бы неплохо поделиться отредактированной частью, потому что это может повлиять на то, как она работает

3. Вы SharingStarted.WhileSubscribed , кстати, смотрели? Похоже, то, что вы хотите, можно было бы проще написать, просто используя обычный поток, а затем поделиться им с shareIn(started = SharingStarted.WhileSubscribed)

4. @Джоффри привет. Спасибо, что проверили мою проблему. Да, у меня было несколько версий метода, и в какой-то момент был просто foobar . Я отредактировал сообщение, чтобы все было правильно вызвано onActive . Вызов distinctUntilChanged() находится в методе, который я пытаюсь написать, а не в тесте. Я полагаю, что отредактированная часть может что-то изменить, НО я фактически удалил эту версию метода, чтобы изолировать проблему. В настоящее время flatMapLatest это просто println и возвращает исходный поток.

5. привет, @Joffrey, из того, что я вижу SharingStarted.WhileSubscribed , используется при вызове shareIn , но мне нужно, чтобы a block: suspend CoroutineScope.() -> Unit выполнялся, когда значение isActive равно true, и чтобы этот сопрограммный параметр был отменен, когда значение isActive равно false . Я не понимаю, как я мог бы связать это с SharingStarted.WhileSubscribed

Ответ №1:

Кажется, поведение, которое вы видите, на самом деле правильное, насколько distinctUntilChanged это касается:

  • первый зарегистрированный подписчик собирает исходные 2 воспроизведенных элемента с начальным isActive=false значением
  • затем isActive становится true из-за этого первого susbcription, так что первый подписчик вспоминает исходный поток из-за flatMapLatest и, таким образом, снова получает воспроизводимые элементы
  • остальные 2 подписчика поступают, когда значение subscriptionCount уже не равно 0, поэтому isActive оно остается для них верным до тех пор, пока они не будут отменены

Если сопрограмма, которую вы запускаете «пока есть подписчики», предназначена для создания элементов в SharedFlow , я бы предпочел сначала определить поток как channelFlow / callbackFlow , а затем использовать shareIn с SharingStarted.WhileSubscribed , чтобы это поведение «запускалось при наличии подписчиков».

Если это «просто на стороне», вам, вероятно, нужна внешняя область и просто запустите сопрограмму отдельно, чтобы прослушать sharedFlow.subscribersCount и запустить / остановить сопрограмму «коляски».

Комментарии:

1. Из вашего объяснения map/distinctUntilChanged/flatMapLatest это происходит. для каждой подписки, а не только один раз при onActive вызове метода. Я уверен, что этому есть хорошее объяснение, но, безусловно, несколько удивительное поведение.

2. К вашему сведению, конечной конечной целью здесь было иметь while(true) { emit(refresh()); delay(duration) } возможность постоянно обновлять значение, пока есть подписчик. Путь, который я пробовал (и здесь потерпел неудачу), заключался в том, чтобы иметь isActiveFlow для вызова block() с новым сопрограммированием, которое я мог бы отменить, когда isActive==false . Вероятно, можно было бы что-то сделать, используя стандартную фабрику потоков flow { , если бы был способ проверить hasSubscribers цикл while. Я вернусь к чертежной доске. Спасибо.

3. @Budius что касается вашего первого комментария, это действительно так, как Flow работает в целом. Единственная «общая» часть — это SharedFlow сама (или все, вплоть до shareIn оператора, когда вы создаете SharedFlow его таким образом). В противном случае все операторы применяются независимо для всех коллекторов.

4. @Budius если, «обновляя значение», вы ссылаетесь на значение в общем потоке, то почему бы не создать его как channelFlow или callbackFlow с помощью этого встроенного цикла обновления, а затем использовать shareIn ? Я думал, что побочная сопрограмма не предназначена для отправки значений в поток

5. Вероятно, можно было бы что-то сделать, используя стандартный поток Flow factory flow { если бы был способ проверить hasSubscribers в цикле while — это именно то, что shareIn(started = SharingStarted.WhileSubscribed) нужно было бы сделать. Вы не беспокоитесь о проверке hasSubscribers вручную, но начальное flow { } содержимое отменяется и перезапускается, когда оно падает до 0