#scala #akka #akka-stream
#скала #акка #akka-поток
Вопрос:
Я использую актера в качестве приемника следующим образом:
import akka.Done
import akka.actor.{Actor, ActorLogging, Props}
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import scala.concurrent._
import scala.util.{Failure, Success}
object WsActor {
def props: Props = Props(new WsActor)
}
final class WsActor extends Actor with ActorLogging {
import com.sweetsoft.WsConnector._
implicit val materializer: Materializer = ActorMaterializer()
implicit val ec: ExecutionContextExecutor = context.system.dispatcher
implicit val actor = context.system
// Future[Done] is the materialized value of Sink.foreach,
// emitted when the stream completes
private val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
println("Unknown messages.")
}
//private val outgoing: Source[Message, Promise[Option[Message]]] =
// Source.maybe[Message]
log.info("Websocket actor started.")
override def receive: Receive = {
case Initialized =>
log.info("Initialization to receive messages via stream.")
sender() ! Ack
case Completed =>
log.info("Streams completed.")
sender() ! Ack
case Msg(value) =>
// the Future[Done] is the materialized value of Sink.foreach
// and it is completed when the stream completes
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(incoming, Source.single(TextMessage(value)))(Keep.left)
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] representing the stream completion from above
val (upgradeResponse, _) =
Http().singleWebSocketRequest(WebSocketRequest("ws://echo.websocket.org"), flow)
upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}.andThen {
case Success(_) =>
log.info("Sending ACK")
sender() ! Ack
}.onComplete {
case Success(_) =>
log.info("Success proceed")
case Failure(ex) => log.error(ex.getMessage)
}
//sender() ! Ack
case Failed(ex) =>
log.info(s"Stream failed with ${ex.getMessage}.")
}
}
Субъект получает сообщения и перенаправляет их дальше на сервер websocket.
Где-то в коде я посылаю Ack
Sender
актеру сигнал о том, что он готов получать дальнейшие сообщения. Но субъект-отправитель никогда не получит Ack
сообщение.
Если я поставлю sender() ! Ack
FUTURE
цепочку, то она будет работать так, как и ожидалось.
Можно ли поместить sender() ! Ack
в цепочку «БУДУЩЕЕ»?
Ответ №1:
Попробуйте запомнить отправителя при получении сообщения, а затем использовать его позже в коде вместо sender()
вместо вызова sender() по потоку, это значение может быть не постоянным во время обработки (например, при получении других сообщений в то же время, когда текущая задача не блокирует обработку очереди сообщений из-за фьючерсов)
case Msg(value) =>
val sender = sender()
.
.
sender ! Ack
Ответ №2:
Ваша ошибка заключается в следующих строках:
}.andThen {
case Success(_) =>
log.info("Sending ACK")
sender() ! Ack
}.onComplete {
Обратный andThen
вызов будет выполнен другим потоком, и sender()
вызов может быть пустым (относительно удачным) или даже другим участником в целом. Вам нужно захватить отправителя перед выполнением первого асинхронного действия.
val respondTo = sender()
upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}.andThen {
case Success(_) =>
log.info("Sending ACK")
respondTo ! Ack
}.onComplete {
case Success(_) =>
log.info("Success proceed")
case Failure(ex) => log.error(ex.getMessage)
}