Как я могу предоставить «закрытую причину» для потока без использования исключений?

#kotlin #kotlin-coroutines #kotlin-flow

#kotlin #kotlin-сопрограммы #kotlin-flow

Вопрос:

Допустим, я создаю конечный поток сообщений как Flow<Message> . Я перебираю некоторый источник сообщений и, когда сообщений больше нет, вырываюсь из цикла, чтобы нормально завершить поток.

 val messages = flow {
    while (source.hasNext()) {
        emit(source.next())
    }
    println("Finished sending all the messages")
}
  

Потребитель просто вызывает collect для получения каждого сообщения по очереди, пока поток не завершится.

 messages.collect { doSomethingWith(it) }
println("All the messages have been processed")
  

Но что происходит, когда мне нужно сообщить вызывающему абоненту, почему поток прекратился? Например, предположим, что создателю сообщения требуется аутентификация, и мне приходится иметь дело с истечением срока действия токена. Теперь есть две причины, по которым поток может быть завершен, и поэтому мой продюсер выглядит так:

 val messages = flow {
    while(source.hasNext()) {
        if (authentication.isExpired) {
            break
        }
        emit(source.next())
    }
    println("Finished sending all the messages")
}
  

Проблема в том, что потребитель не знает, почему поток завершен. Сообщения просто закончились, или им нужно повторно пройти аутентификацию и продолжить выборку сообщений? Теперь я могу придумать два возможных способа справиться с этим, ни один из которых мне не нравится.

Исключения

Простым решением было бы сигнализировать о «ненормальном» завершении потока с помощью исключения, подобного этому:

 val messages = flow {
    while (source.hasNext()) {
        if (authentication.isExpired) {
            throw AuthenticationExpiredException()
        }
        emit(source.next())
    }
    println("Finished sending all the messages")
}
  

Это здорово, потому что теперь потребитель может использовать try / catch , чтобы определить, почему поток был прекращен:

 try {
    messages.collect { doSomethingWith(it) }
    println("There are no more messages")
} catch (e: AuthenticationExpiredException) {
    println("The auth token expired")
}
  

Но использование подобных исключений — это абсолютно не то, что я хочу делать. В своей статье о Kotlin и исключениях руководитель Kotlin libraries Роман Елизаров прямо говорит нам:

не используйте исключения для возврата результирующего значения, [и] избегайте try / catch в общем коде приложения.

Это отличная статья, и есть много причин, по которым я согласен с ней, но одна из важнейших причин отказа от использования исключений заключается в том, что она вводит поведение, которое не проверяется по типу. Если я использую my AuthenticationExpiredException , то, не читая код, вызывающий поток не знает, что он должен перехватывать и обрабатывать его. Код будет отлично компилироваться без try / catch и завершится сбоем во время выполнения. Это преднамеренный выбор языкового дизайна в Kotlin: исключения не проверяются во время компиляции, потому что они просто не предназначены для использования таким образом.

Sealed classes

A great way to return different types of results elsewhere in Kotlin is to use sealed classes. So I might write something like this:

 sealed class FlowResult<out T> {
    data class Value<T>(val value: T): FlowResult<T>
    object: NoMoreMessages: FlowResult<Nothing>()
    object: AuthenticationExpired: FlowResult<Nothing>()
}

val messages = flow {
    while(source.hasNext()) {
        if (authentication.isExpired) {
            emit(AuthenticationExpired)
            break
        }
        emit(Value(source.next()))
    }
    emit(NoMoreMessages)
}
  

Это намного приятнее, потому что теперь две возможные близкие причины рассматриваются одинаково. Потребитель может использовать when блок для обработки разных результатов и получит ошибку компиляции, если они не включают ветвь, которая обрабатывает AuthenticationExpired .

Проблема, с которой я сталкиваюсь при таком подходе, заключается в том, что я не могу обеспечить соблюдение правила о том, что результаты «сбоя» всегда должны быть конечным элементом в потоке. Например, допустим, я допустил ошибку и случайно пропустил break из своего цикла.

 val messages = flow {
    while(source.hasNext()) {
        if (authentication.isExpired) {
            emit(AuthenticationExpired)
        }
        emit(Value(source.next()))
    }
    emit(NoMoreMessages)
}
  

Теперь я в конечном итоге выдам AuthenticationExpired значение, фактически не завершая поток. Он просто продолжит выдавать сообщения. Это может привести к всевозможным ошибкам и проблемам безопасности, но он будет компилироваться просто отлично.


Похоже, что я действительно ищу механизм, с помощью которого collect можно возвращать значение, подобное этому:

 val closeReason = messages.collect { doSomethingWith(it) }
when (closeReason) {
    ...
}
  

Есть ли способ, которым я могу добиться чего-то подобного? Или есть другой подход, который позволяет обойти проблемы, которые я описал?

Ответ №1:

Не существует встроенного метода достижения того, чего вы хотите, но его можно создать поверх существующих API и объединить его с закрытыми классами:

 // Essentially a functional Either type
sealed class Signal<out V, out T> {
    data class Value<V>(val value: V): Signal<V, Nothing>
    data class Terminal<T>(val value: T): Signal<Nothing, T>
}
  

Затем создайте набор методов расширения, которые выполняют нужную вам обработку

 inline suspend fun <reified V, T> Flow<Signal<V,T>>.collect(
    valueCollector: suspend (V) -> Unit
): T = onEach {
    if (it is Signal.Value) {
        valueCollector(it.value)
    }
}.filterIsInstance<Signal.Terminal<T>>().first().value
  

Это обработало value элементы с использованием предоставленного сборщика и возвращает значение первого Terminal экземпляра, который он видит как возвращаемое значение.