Поток Akka, Tcp().bind, обрабатывается, когда клиент закрывает сокет

#scala #akka #akka-stream #tcpsocket #akka-io

#scala #akka #akka-поток #tcpsocket #akka-io

Вопрос:

Я совсем новичок в Akka Stream, и я хотел бы узнать, как обрабатывать сокет TCP для моего проекта. Я взял этот фрагмент кода из официальной документации Akka Stream.

 import akka.stream.scaladsl.Framing

val connections: Source[IncomingConnection, Future[ServerBinding]] =
  Tcp().bind(host, port)

connections.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo = Flow[ByteString]
    .via(Framing.delimiter(ByteString("n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .map(_   "!!!n")
    .map(ByteString(_))

  connection.handleWith(echo)
}
 

Если я подключаюсь с терминала с помощью netcat, я вижу, что TCP-сокет Akka Stream работает так, как ожидалось. Я также обнаружил, что если мне нужно закрыть соединение с помощью сообщения пользователя, я могу использовать takeWhile следующее

 import akka.stream.scaladsl.Framing

val connections: Source[IncomingConnection, Future[ServerBinding]] =
  Tcp().bind(host, port)

connections.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo = Flow[ByteString]
    .via(Framing.delimiter(ByteString("n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .takeWhile(_.toLowerCase.trim != "exit")   // < - - - - - - HERE
    .map(_   "!!!n")
    .map(ByteString(_))

  connection.handleWith(echo)
}
 

Чего я не могу найти, так это как управлять сокетом, закрытым CMD C действием. Поток Akka использует Akka.io для внутреннего управления TCP-соединением, поэтому он должен отправлять некоторые из своих PeerClose сообщений при закрытии сокета. Итак, мое понимание Akka.io говорит мне, что я должен получить обратную связь от закрытия сокета, но я не могу найти, как это сделать с помощью Akka Stream. Есть ли способ управлять этим?

Ответ №1:

connection.handleWith(echo) это синтаксический сахар, для connection.flow.joinMat(echo)(Keep.right).run() которого будет иметь материализованное значение echo , что обычно бесполезно. Flow.via.map.takeWhile имеет NotUsed материализованное значение, так что это тоже в принципе бесполезно. Однако вы можете присоединять этапы, к echo которым будут выполняться по-разному.

Одним из них является .watchTermination :

 connections.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo: Flow[ByteString, ByteString, Future[Done]] = Flow[ByteString]
    .via(Framing.delimiter(ByteString("n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .takeWhile(_.toLowerCase.trim != "exit")   // < - - - - - - HERE
    .map(_   "!!!n")
    .map(ByteString(_))
    // change the materialized value to a Future[Done]
    .watchTermination()(Keep.right)

  // you may need to have an implicit ExecutionContext in scope, e.g. system.dispatcher,
  //  if you don't already
  connection.handleWith(echo).onComplete {
    case Success(_) => println("stream completed successfully")
    case Failure(e) => println(e.getMessage)
  }
}
 

Это не будет различать вашу сторону или удаленную сторону, закрывающую соединение нормально; он будет различать сбой потока.