Как сохранить соединение открытым с сервером websocket?

#scala #akka #akka-stream

#scala #akka #akka-stream

Вопрос:

У меня есть следующий код, который не сохраняет соединение открытым с сервером websocket:

 
import akka.Done
import akka.actor.{Actor, ActorLogging, Props}
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

import scala.concurrent._
import scala.util.{Failure, Success}

object WsActor {
  def props: Props = Props(new WsActor)
}

final class WsActor extends Actor with ActorLogging {

  import com.sweetsoft.WsConnector._

  implicit val materializer: Materializer = ActorMaterializer()
  implicit val ec: ExecutionContextExecutor = context.system.dispatcher
  implicit val actor = context.system

  // Future[Done] is the materialized value of Sink.foreach,
  // emitted when the stream completes
  private val incoming: Sink[Message, Future[Done]] =
  Sink.foreach[Message] {
    case message: TextMessage.Strict =>
      println(message.text)
    case _ =>
      println("Unknown messages.")
  }

  //private val outgoing: Source[Message, Promise[Option[Message]]] =
  //  Source.maybe[Message]

  //  val flow: Flow[Message, Message, Promise[Option[Message]]] =
  //    Flow.fromSinkAndSourceMat(incoming, Source.maybe[Message])(Keep.right)


  log.info("Websocket actor started.")

  override def receive: Receive = {
    case Initialized =>
      log.info("Initialization to receive messages via stream.")
      sender() ! Ack
    case Completed =>
      log.info("Streams completed.")
      sender() ! Ack
    case Msg(value) =>

      val replyTo = sender()
      val flow: Flow[Message, Message, Promise[Option[Message]]] =
        Flow.fromSinkAndSourceMat(incoming, Source.single(TextMessage(value)).concatMat(Source.maybe[Message])(Keep.right))(Keep.right)

      val (upgradeResponse, _) =
        Http().singleWebSocketRequest(WebSocketRequest("ws://127.0.0.1:7000/ws"), flow.mapAsync(4)(msg => Future(msg)))

      upgradeResponse.flatMap { upgrade =>
        if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
          Future.successful(Done)
        } else {
          throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
        }
      }.onComplete {
        case Success(_) =>
          replyTo ! Ack
          log.info("Done")
        case Failure(ex) => log.error(ex.getMessage)
      }

    case Failed(ex) =>
      log.info(s"Stream failed with ${ex.getMessage}.")
  }

}
  

Поэтому каждый раз, когда получено сообщение, оно закрывает соединение и открывает новое соединение для следующего запроса.
Вопрос в том, как я могу сохранить соединение открытым?

Ответ №1:

Http().webSocketClientFlow вместо Http().singleWebSocketRequest

Http().webSocketClientFlow даст вам поток Flow[Message, Message, Future[WebSocketUpgradeResponse]]

Это не будет создавать новое соединение каждый раз.

Вы должны объявить его в сопутствующем объекте, чтобы каждый экземпляр класса мог использовать одно и то же соединение.

Сначала объявите свою систему актеров для всего приложения в отдельном пакете.

 object ActorEssentials {
        implicit val actorSystem = ActorSystem("test")
}
  

Затем вы можете объявить следующее

 object WsActor {
  import ActorEssentials._
  def props: Props = Props[WsActor]
  val flow = Http()(actorSystem).webSocketClientFlow(...) 
}
  

Комментарии:

1. You should declare it in the companion object, so every instance of the class can use the same connection. Я все еще изучаю Scala, не могли бы вы привести пример этого?

2. Как вы объявили def props=new Props(new WsActor) в object WsActor {} . Вы должны включить Http().webSocketClientFlow сопутствующий объект in. Таким образом, он будет создан только один раз.

3. Другое дело, что вы должны изменить def props = new Props(new WsActot) на def props=Props[WsActor]

4. Я не могу поделиться Http().webSocketClientFlow , потому def apply()(implicit system: ActorSystem): HttpExt = super.apply(system) что, как вы можете видеть, он ожидает ActorSystem

5. Потрясающе, большое спасибо за вашу помощь. Я очень признателен.