Как работает ktor websocket flow API?

#websocket #server #flow #ktor

#websocket #сервер #ktor

Вопрос:

Я использую ktor для разработки на стороне сервера с помощью websockets.

Документация показывает нам этот пример использования входящего канала:

 for (frame in incoming.mapNotNull { it as? Frame.Text }) {
    // some
}
  

Но mapNotNull помечен как устаревший в пользу Flow . Как я должен использовать этот API и какие проблемы могут возникнуть? Например, Flow это холодный поток. Это означает, что функция производителя будет вызываться на каждом collect . Как это работает в контексте websocket. Будет ли он повторно открыт при втором collect вызове или, возможно, старые сообщения будут доставлены один раз после следующего collect ? Как я могу собирать N сообщения, затем прекращать сбор, а затем собирать снова?

Заранее спасибо 🙂

Ответ №1:

Как я должен использовать этот API и какие проблемы могут возникнуть?

То, что я использую и что я видел в одном из примеров где-то в документах, — это consumeAsFlow() метод, вызываемый на ReceiveChannel . Вот весь фрагмент:

 webSocket("/websocket") { //this: DefaultWebSocketServerSession
    incoming
        .consumeAsFlow()
        .map { receive(it) }
        .collect()
}
  

Не видел серьезных проблем с этим подходом. Одна вещь, о которой вы должны знать (но это относится и к подходу без потока), заключается в том, что если вы создадите свой поток, это приведет к разрыву соединения с WebSocket, что обычно не то, что вы хотели бы делать. Возможно, стоит подумать о том, чтобы обернуть все это в try-catch .

Будет ли он повторно открыт при втором вызове сбора, или, возможно, старые сообщения будут доставлены один раз после следующего сбора?

Вы открываете websocket еще до того, как начнете получать сообщения из потока. Вы можете видеть, что внутри webSocket() {} вы находитесь в контексте DefaultWebSocketServerSession . Это ваше управление подключением. Внутри вашего потока вы просто получаете сообщения одно за другим по мере их поступления (после установления соединения). Если соединение прерывается, вы выходите из потока. Его необходимо восстановить, прежде чем вы сможете обрабатывать свои сообщения. Этот установочный бит выполняется Route.webSocket() методом. Я рекомендую взглянуть на его Javadoc.

Если вы хотите добавить некоторую очистку после закрытия соединения, вы можете добавить finally блок следующим образом:

 webSocket("/chat") {
    try {
        incoming
            .consumeAsFlow()
            .map { receive(it, client) }
            .collect()
    } finally {
        // cleanup
    }
}
  

Короче говоря: collect вызывается один раз для каждого полученного сообщения. Если соединение отсутствует (или оно было разорвано), то collect вызов вызываться не будет.

Как я могу собрать N сообщений, затем прекратить сбор, а затем снова собрать?

Каков вариант использования для этого? Я не думаю, что вы должны делать это с любым потоком. Вы, конечно, можете take(n) использовать элементы из потока, но вы больше не сможете извлекать из него что-либо еще.