Котлин: изменяемое поведение буферного потока при первом запуске

#kotlin #kotlin-coroutines #kotlin-flow

Вопрос:

У меня есть следующий фрагмент кода:

 val flow = MutableSharedFlow<Int>()

launch {
    repeat(10) {
        delay(100)
        println("emitting$it")
        flow.emit(it)
    }
}
    
launch {
    flow.collect {
        delay(1000)
        println("a$it")
    }
}

launch {
    flow.collect {
        println("b$it")
    }
}
 

Я бы ожидал, что результат будет таким:

излучение0 в0 а0 излучение1 в1 а1 …

Из BufferOverflow.SUSPEND — за чего эмиттер вынужден ждать, пока события не будут израсходованы.

Фактический вывод следует этому шаблону, за исключением первых двух элементов, поэтому фактический вывод: излучение 0 b0 излучение 1 b1 a0 (теперь оно начинает следовать поведению, которое я ожидал) излучение 2 b2 a1 излучение 3 b3 a2 …

Что вызывает выброс второго элемента? Не следует ли подождать, пока будет потреблен первый товар?

Ответ №1:

Это действительно подождет. Что вас смущает, так это delay(1000) перед печатью a# . К тому времени, когда он печатает a# , прошла 1 секунда с тех пор, как он начал его потреблять. Поменяйте их местами, и вы это ясно увидите.

 flow.collect {
    println("a$it")
    delay(1000)
}
 

Вы также можете распечатать что-то еще после отправки элемента, чтобы сделать его еще более четким:

 repeat(10) {
    delay(100)
    println("emitting$it")
    flow.emit(it)
    println("emited$it")
}
 

Вот что он выводит:

 emitting0
a0
b0
emited0
emitting1
b1
a1
emited1
emitting2
b2
a2
emited2
...
 

Комментарии:

1.Спасибо, это немного прояснило ситуацию, но я думаю, что все еще неправильно понимаю, когда событие рассматривается как потребленное и когда может начаться новое излучение. Я сделал вывод немного чище: я добавил println("emited$it") , как вы упомянули, и я также добавил println("starts waiting for a$it") , прежде чем в моем коде произойдет задержка. Теперь он печатает: emitting0 starts waiting for a0 b0 emitted0 emitting1 b1 a0 starts waiting for a1 и так далее. Таким образом, событие рассматривается как потребляемое, когда collect { } начинается выполнение, а не когда оно завершено?

2. @jakubledwon При collect запуске, независимо от того, что он делает, производитель/отправитель «понимает», что товар был потреблен этим потребителем, поэтому он может выпустить больше товаров, если все потребители потребили последний выпущенный товар.