#scala #akka #akka-stream
#scala #akka #akka-stream
Вопрос:
У меня есть следующий код, который не сохраняет соединение открытым с сервером websocket:
import akka.Done
import akka.actor.{Actor, ActorLogging, Props}
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import scala.concurrent._
import scala.util.{Failure, Success}
object WsActor {
def props: Props = Props(new WsActor)
}
final class WsActor extends Actor with ActorLogging {
import com.sweetsoft.WsConnector._
implicit val materializer: Materializer = ActorMaterializer()
implicit val ec: ExecutionContextExecutor = context.system.dispatcher
implicit val actor = context.system
// Future[Done] is the materialized value of Sink.foreach,
// emitted when the stream completes
private val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
println("Unknown messages.")
}
//private val outgoing: Source[Message, Promise[Option[Message]]] =
// Source.maybe[Message]
// val flow: Flow[Message, Message, Promise[Option[Message]]] =
// Flow.fromSinkAndSourceMat(incoming, Source.maybe[Message])(Keep.right)
log.info("Websocket actor started.")
override def receive: Receive = {
case Initialized =>
log.info("Initialization to receive messages via stream.")
sender() ! Ack
case Completed =>
log.info("Streams completed.")
sender() ! Ack
case Msg(value) =>
val replyTo = sender()
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(incoming, Source.single(TextMessage(value)).concatMat(Source.maybe[Message])(Keep.right))(Keep.right)
val (upgradeResponse, _) =
Http().singleWebSocketRequest(WebSocketRequest("ws://127.0.0.1:7000/ws"), flow.mapAsync(4)(msg => Future(msg)))
upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}.onComplete {
case Success(_) =>
replyTo ! Ack
log.info("Done")
case Failure(ex) => log.error(ex.getMessage)
}
case Failed(ex) =>
log.info(s"Stream failed with ${ex.getMessage}.")
}
}
Поэтому каждый раз, когда получено сообщение, оно закрывает соединение и открывает новое соединение для следующего запроса.
Вопрос в том, как я могу сохранить соединение открытым?
Ответ №1:
Http().webSocketClientFlow
вместо Http().singleWebSocketRequest
Http().webSocketClientFlow
даст вам поток Flow[Message, Message, Future[WebSocketUpgradeResponse]]
Это не будет создавать новое соединение каждый раз.
Вы должны объявить его в сопутствующем объекте, чтобы каждый экземпляр класса мог использовать одно и то же соединение.
Сначала объявите свою систему актеров для всего приложения в отдельном пакете.
object ActorEssentials {
implicit val actorSystem = ActorSystem("test")
}
Затем вы можете объявить следующее
object WsActor {
import ActorEssentials._
def props: Props = Props[WsActor]
val flow = Http()(actorSystem).webSocketClientFlow(...)
}
Комментарии:
1.
You should declare it in the companion object, so every instance of the class can use the same connection.
Я все еще изучаю Scala, не могли бы вы привести пример этого?2. Как вы объявили
def props=new Props(new WsActor)
вobject WsActor {}
. Вы должны включитьHttp().webSocketClientFlow
сопутствующий объект in. Таким образом, он будет создан только один раз.3. Другое дело, что вы должны изменить
def props = new Props(new WsActot)
наdef props=Props[WsActor]
4. Я не могу поделиться
Http().webSocketClientFlow
, потомуdef apply()(implicit system: ActorSystem): HttpExt = super.apply(system)
что, как вы можете видеть, он ожидаетActorSystem
5. Потрясающе, большое спасибо за вашу помощь. Я очень признателен.