#kotlin #kotlin-coroutines #kotlin-flow
#kotlin #kotlin-сопрограммы #kotlin-flow
Вопрос:
Допустим, я создаю конечный поток сообщений как Flow<Message>
. Я перебираю некоторый источник сообщений и, когда сообщений больше нет, вырываюсь из цикла, чтобы нормально завершить поток.
val messages = flow {
while (source.hasNext()) {
emit(source.next())
}
println("Finished sending all the messages")
}
Потребитель просто вызывает collect
для получения каждого сообщения по очереди, пока поток не завершится.
messages.collect { doSomethingWith(it) }
println("All the messages have been processed")
Но что происходит, когда мне нужно сообщить вызывающему абоненту, почему поток прекратился? Например, предположим, что создателю сообщения требуется аутентификация, и мне приходится иметь дело с истечением срока действия токена. Теперь есть две причины, по которым поток может быть завершен, и поэтому мой продюсер выглядит так:
val messages = flow {
while(source.hasNext()) {
if (authentication.isExpired) {
break
}
emit(source.next())
}
println("Finished sending all the messages")
}
Проблема в том, что потребитель не знает, почему поток завершен. Сообщения просто закончились, или им нужно повторно пройти аутентификацию и продолжить выборку сообщений? Теперь я могу придумать два возможных способа справиться с этим, ни один из которых мне не нравится.
Исключения
Простым решением было бы сигнализировать о «ненормальном» завершении потока с помощью исключения, подобного этому:
val messages = flow {
while (source.hasNext()) {
if (authentication.isExpired) {
throw AuthenticationExpiredException()
}
emit(source.next())
}
println("Finished sending all the messages")
}
Это здорово, потому что теперь потребитель может использовать try
/ catch
, чтобы определить, почему поток был прекращен:
try {
messages.collect { doSomethingWith(it) }
println("There are no more messages")
} catch (e: AuthenticationExpiredException) {
println("The auth token expired")
}
Но использование подобных исключений — это абсолютно не то, что я хочу делать. В своей статье о Kotlin и исключениях руководитель Kotlin libraries Роман Елизаров прямо говорит нам:
не используйте исключения для возврата результирующего значения, [и] избегайте
try
/catch
в общем коде приложения.
Это отличная статья, и есть много причин, по которым я согласен с ней, но одна из важнейших причин отказа от использования исключений заключается в том, что она вводит поведение, которое не проверяется по типу. Если я использую my AuthenticationExpiredException
, то, не читая код, вызывающий поток не знает, что он должен перехватывать и обрабатывать его. Код будет отлично компилироваться без try
/ catch
и завершится сбоем во время выполнения. Это преднамеренный выбор языкового дизайна в Kotlin: исключения не проверяются во время компиляции, потому что они просто не предназначены для использования таким образом.
Sealed classes
A great way to return different types of results elsewhere in Kotlin is to use sealed classes. So I might write something like this:
sealed class FlowResult<out T> {
data class Value<T>(val value: T): FlowResult<T>
object: NoMoreMessages: FlowResult<Nothing>()
object: AuthenticationExpired: FlowResult<Nothing>()
}
val messages = flow {
while(source.hasNext()) {
if (authentication.isExpired) {
emit(AuthenticationExpired)
break
}
emit(Value(source.next()))
}
emit(NoMoreMessages)
}
Это намного приятнее, потому что теперь две возможные близкие причины рассматриваются одинаково. Потребитель может использовать when
блок для обработки разных результатов и получит ошибку компиляции, если они не включают ветвь, которая обрабатывает AuthenticationExpired
.
Проблема, с которой я сталкиваюсь при таком подходе, заключается в том, что я не могу обеспечить соблюдение правила о том, что результаты «сбоя» всегда должны быть конечным элементом в потоке. Например, допустим, я допустил ошибку и случайно пропустил break
из своего цикла.
val messages = flow {
while(source.hasNext()) {
if (authentication.isExpired) {
emit(AuthenticationExpired)
}
emit(Value(source.next()))
}
emit(NoMoreMessages)
}
Теперь я в конечном итоге выдам AuthenticationExpired
значение, фактически не завершая поток. Он просто продолжит выдавать сообщения. Это может привести к всевозможным ошибкам и проблемам безопасности, но он будет компилироваться просто отлично.
Похоже, что я действительно ищу механизм, с помощью которого collect
можно возвращать значение, подобное этому:
val closeReason = messages.collect { doSomethingWith(it) }
when (closeReason) {
...
}
Есть ли способ, которым я могу добиться чего-то подобного? Или есть другой подход, который позволяет обойти проблемы, которые я описал?
Ответ №1:
Не существует встроенного метода достижения того, чего вы хотите, но его можно создать поверх существующих API и объединить его с закрытыми классами:
// Essentially a functional Either type
sealed class Signal<out V, out T> {
data class Value<V>(val value: V): Signal<V, Nothing>
data class Terminal<T>(val value: T): Signal<Nothing, T>
}
Затем создайте набор методов расширения, которые выполняют нужную вам обработку
inline suspend fun <reified V, T> Flow<Signal<V,T>>.collect(
valueCollector: suspend (V) -> Unit
): T = onEach {
if (it is Signal.Value) {
valueCollector(it.value)
}
}.filterIsInstance<Signal.Terminal<T>>().first().value
Это обработало value
элементы с использованием предоставленного сборщика и возвращает значение первого Terminal
экземпляра, который он видит как возвращаемое значение.