#scala #akka #akka-stream #akka-http
Вопрос:
У меня есть довольно простое приложение, которое имеет конечную точку Akka HTTP, выполняет некоторую обработку и записывает результаты в любой из выходных файлов. Код, обеспечивающий плавное завершение работы, выглядит немного сложным, есть ли способ сделать его более кратким?
val bindingFuture = Http().newServerAt("localhost", config.port).bind(route)
val validQueue: BoundedSourceQueue[ByteString] = ???
val invalidQueue: BoundedSourceQueue[ByteString] = ???
val validDone: Future[Done] = ???
val invalidDone: Future[Done] = ???
val allDone = Future.sequence(validDone, invalidDone)
bindingFuture.onComplete {
case Success(binding) =>
logger.info("Server started on port {}", config.port)
binding.addToCoordinatedShutdown(5.seconds)
case Failure(ex) =>
logger.error("Can't start server", ex)
system.terminate()
}
allDone.onComplete { result =>
result match {
case Failure(ex) =>
logger.error("Streams completed with error", ex)
case Success(_) =>
logger.info("Streams completed successfully")
}
system.terminate()
}
sys.addShutdownHook {
logger.info("Shutting down...")
validQueue.complete()
invalidQueue.complete()
}
Ответ №1:
Akka устанавливает перехваты завершения работы JVM по умолчанию, нет необходимости добавлять перехват завершения работы самостоятельно, вы можете удалить sys.addShutdownHook { ... }
вызов ActorSystem.terminate
, который также завершит все потоки. (Потоки могут быть прерваны внезапно, хотя в большинстве приложений это не проблема, и, похоже, в вашем случае это также не будет проблемой.)
Небольшая очистка, вы могли бы рассмотреть возможность использования map
, recoverWith
и andThen
.
allDone.map { _ =>
logger.info("Streams completed successfully")
}.recoverWith {
case ex =>
logger.error("Streams completed with error", ex)
}.andThen {
case _ => system.terminate()
}
Вы могли бы использовать CoordinatedShutdown:
CoordinatedShutdown(context.system).addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "complete hdfs sinks") { () =>
validQueue.complete()
invalidQueue.complete()
}
Вы также можете использовать общий переключатель отключения (https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html#sharedkillswitch ), который вы можете поместить в свои потоки .via(sharedKillSwitch.flow)
, вы могли бы отключить переключатель из CoordinatedShutdown:
// create it somewhere, use in your flows
val sharedKillSwitch = KillSwitches.shared("hdfs-switch")
// use switch in CoordinatedShutdown
CoordinatedShutdown(context.system).addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "complete hdfs sinks") { () =>
sharedKillSwitch.shutdown()
}
Комментарии:
1. В моем случае на самом деле важно корректно завершать работу приемников (это приемники Alpakka HDFS), которые записывают файлы, иначе последний файл фактически не записывается.
2. Вы могли бы использовать CoordinatedShutdown вместо
addShutdownHook
и завершить потоки, возможно, используяPhaseServiceRequestsDone
для ожидания выполнения запросов, которые выполняются. Вы проверили doc.akka.io/docs/akka/current/coordinated-shutdown.html ?3. Добавлены некоторые подробности о том, как это сделать.