#scala #fs2 #cats-effect
#scala #fs2 #кошки-эффект
Вопрос:
Используя fs2 (версия 1.0.4) и cats-effect IO
, я могу передавать URL-адрес в локальный файл,
import concurrent.ExecutionContext.Implicits.global
def download(spec: String, filename: String): Stream[IO, Unit] =
io.readInputStream((new URL(spec).openConnection.getInputStream), 4096, global, true)
.through(io.file.writeAll(Paths.get(filename), global))
Однако этот фрагмент кода не возвращает никакой информации о процессе, когда он завершен. Кроме того, помимо того, что я знаю, является операция успешной или неудачной, я также хочу знать, сколько байтов считывается, если операция прошла успешно. Я не хочу проверять новый размер файла, чтобы получить эту информацию. С другой стороны, если операция завершилась неудачей, я хочу знать, что вызывает сбой.
Я пытался attempt
, но не смог выполнить последующие шаги для записи необработанных байтов в новый файл. Пожалуйста, посоветуйте. Спасибо
Ответ №1:
Возможно, вы захотите поиграть с observe
. Я уверен, что есть лучший способ сделать это, но вот пример, который может помочь вам отклеиться:
Ваш исходный код, который компилируется и выполняется:
import fs2.io
import cats.effect.{IO, ContextShift}
import concurrent.ExecutionContext.Implicits.global
import java.net.URL
import java.nio.file.Paths
object Example1 {
implicit val contextShift: ContextShift[IO] = IO.contextShift(global)
def download(spec: String, filename: String): fs2.Stream[IO, Unit] =
io.readInputStream[IO](IO(new URL(spec).openConnection.getInputStream), 4096, global, closeAfterUse=true)
.through(io.file.writeAll(Paths.get(filename), global))
def main(args: Array[String]): Unit = {
download("https://isitchristmas.com/", "/tmp/christmas.txt")
.compile.drain.unsafeRunSync()
}
}
Использование observe для подсчета байтов:
import fs2.io
import cats.effect.{IO, ContextShift}
import concurrent.ExecutionContext.Implicits.global
import java.net.URL
import java.nio.file.Paths
object Example2 {
implicit val contextShift: ContextShift[IO] = IO.contextShift(global)
final case class DlResults(bytes: Long)
def download(spec: String, filename: String): fs2.Stream[IO, DlResults] =
io.readInputStream[IO](IO(new URL(spec).openConnection.getInputStream), 4096, global, closeAfterUse = true)
.observe(io.file.writeAll(Paths.get(filename), global))
.fold(DlResults(0L)) { (r, _) => DlResults(r.bytes 1) }
def main(args: Array[String]): Unit = {
download("https://isitchristmas.com/", "/tmp/christmas.txt")
.compile
.fold(()){ (_, r) => println(r)}
.unsafeRunSync()
}
}
Вывод:
> DlResults(42668)
Комментарии:
1. Спасибо. Вы дали мне несколько идей для продолжения. Мои предложения,
DlResults
это не обязательно, потому что вы можете ` .fold(0L) { (r, _) => r.bytes 1`, если у вас нет других причин не изменять. И завершить его с помощью.compile.last.unsafeRunSync().foreach(println)
2. Да, DlResults были бы более полезными, если бы у вас было несколько фрагментов информации для возврата. И
compile.last
определенно выглядит лучше, чем мой бред о сворачивании. Рад, что смог помочь.
Ответ №2:
Я нашел решение с точки зрения Resource
и IO
и предложения @codenoodle.
Обновление # 1
Resource
удалено, поскольку оно является избыточным при использовании с FS2 и усложняет код.
import java.io.{
FileNotFoundException,
FileOutputStream,
InputStream,
OutputStream
}
import java.net.URL
import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.implicits._
import fs2._
import scala.concurrent.ExecutionContext.Implicits.global
object LetStream extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
def write(source: IO[InputStream], target: IO[OutputStream]) =
io.readInputStream(source, 4096, global)
.chunks
.flatMap(Stream.chunk)
.observe(io.writeOutputStream(target, global))
.chunks
.fold(0L)((acc, chunk) => acc chunk.size)
write(IO(new FileOutputStream("image.jpg")),
IO(new URL("http://localhost:8080/images/helloworld.jpg")
.openConnection
.getInputStream))
.use(_.compile.toList)
.flatMap(size =>
IO(println(s"Written ${size.head} bytes")) *> IO(ExitCode.Success))
.recover {
case t: FileNotFoundException =>
IO(println(s"Not found, ${t.getMessage}")) *> IO(ExitCode.Error)
case err =>
IO(println(err.getMessage)) *> IO(ExitCode.Error)
}
}
}