#recursion #rust #async-await
Вопрос:
Я пытаюсь рекурсивно находить файлы с расширением в директории, и вот моя текущая реализация:
use futures::future::BoxFuture;
use futures::Stream;
use std::io::ErrorKind;
use std::pin::Pin;
use std::resu<
use tokio::fs::read_dir;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;
type Result<T> = result::Result<T, std::io::Error>;
type FileNameStream = Pin<Box<dyn Stream<Item = Result<String>> Send Sync 'static>>;
async fn list_all(root_path: String, ext: String) -> Result<FileNameStream> {
async fn one_level(path: String, tx: Sender<Result<String>>, ext: String) -> Result<()> {
let mut dir = read_dir(path).await?;
let mut files: Vec<String> = Vec::new();
while let Some(child) = dir.next_entry().await? {
if let Some(child_path) = child.path().to_str() {
if child.metadata().await?.is_dir() {
tokio::spawn(async {
one_level(child_path.to_string(), tx.clone(), ext.clone()).await;
});
} else {
if child_path.ends_with(amp;ext.clone()) {
files.push(child_path.to_string())
}
}
} else {
tx.send(Err(std::io::Error::new(
ErrorKind::Other,
"Invalid path".to_string(),
)));
}
}
for file in files {
tx.send(Ok(file));
}
Ok(())
}
let (tx, rx): (Sender<Result<String>>, Receiver<Result<String>>) = channel(2);
tokio::spawn(async {
one_level(root_path, tx, ext).await;
});
Ok(Box::pin(ReceiverStream::new(rx)))
}
Я не совсем понимаю, почему компилятор жалуется:
14 | async fn one_level(path: String, tx: Sender<Result<String>>, ext: String) -> Result<()> {
| ^^^^^^^^^^
|
note: ...which requires borrow-checking `list_all::{closure#0}::one_level`...
--> src/main.rs:14:5
.....
.....
= note: ...which requires evaluating trait selection obligation `for<'r, 's, 't0> {std::future::ResumeTy, amp;'r str, std::string::String, amp;'s tokio::sync::mpsc::Sender<std::result::Result<std::string::String, std::io::Error>>, tokio::sync::mpsc::Sender<std::result::Result<std::string::String, std::io::Error>>, amp;'t0 std::string::String, impl futures::Future, ()}: std::marker::Send`...
= note: ...which again requires computing type of `list_all::{closure#0}::one_level::{opaque#0}`, completing the cycle
= note: cycle used when evaluating trait selection obligation `{std::future::ResumeTy, std::string::String, tokio::sync::mpsc::Sender<std::result::Result<std::string::String, std::io::Error>>, impl futures::Future, ()}: std::marker::Send`
Можно ли определить такую рекурсивную функцию как асинхронную? Могу ли я параллельно выполнить процедуру перечисления dir tokio::spwan
и ускорить ее?
Комментарии:
1. rust-lang.github.io/async-book/07_workarounds/04_recursion.html
Ответ №1:
Асинхронные функции rust компилируются в конечные машины, поэтому для вызова асинхронной функции потребуется, чтобы конечная машина встраивалась в свое собственное определение, что было бы бесконечной рекурсией.
Это лучше объясняется здесь. Обходной путь, как описано в уже связанном документе, заключается в том, чтобы ввести косвенное использование Box
( BoxFuture
тип) и несинхронную функцию:
use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
use std::io::ErrorKind;
use std::pin::Pin;
use std::resu<
use tokio::fs::read_dir;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;
type Result<T> = result::Result<T, std::io::Error>;
type FileNameStream = Pin<Box<dyn Stream<Item = Result<String>> Send Sync 'static>>;
async fn list_all(root_path: String, ext: String) -> Result<FileNameStream> {
let (tx, rx): (Sender<Result<String>>, Receiver<Result<String>>) = channel(2);
tokio::spawn(async {
recursive(root_path, tx, ext).await.unwrap();
});
Ok(Box::pin(ReceiverStream::new(rx)))
}
fn recursive(
path: String,
tx: Sender<Result<String>>,
ext: String,
) -> BoxFuture<'static, Result<()>> {
async move {
let mut dir = read_dir(path).await?;
let mut files: Vec<String> = Vec::new();
while let Some(child) = dir.next_entry().await? {
match child.path().to_str() {
Some(child_path) => {
let metadata = child.metadata().await?;
if metadata.is_dir() {
let cp = child_path.to_owned();
let tx = tx.clone();
let ext = ext.clone();
tokio::spawn(async {
recursive(cp, tx, ext).await.unwrap();
});
} else {
if child_path.ends_with(amp;ext) {
files.push(child_path.to_owned())
}
}
}
None => {
tx.send(Err(std::io::Error::new(
ErrorKind::Other,
"Invalid path".to_string(),
)))
.await
.unwrap();
}
}
}
for file in files {
tx.send(Ok(file)).await.unwrap();
}
Ok(())
}
.boxed()
}
PS: Я также исправил некоторые другие проблемы с вашим кодом, такие как пропущенные await
tx.send()
вызовы — помните — фьючерсы выполняют работу только тогда, когда они опрошены!!!
Комментарии:
1. Спасибо за ответ! Не могли бы вы, пожалуйста, объяснить, почему
unwrap
в результате внутриtokio::spawn
иtx.send(Ok).await
? будут ли они просто паниковать по всей программе или только по потоку/исполнителю, который ее запускает? Можно ли также отлавливать и распространять ошибки и отправлять их на канал?2. Их разворачивание вызовет панику и, таким образом, приведет к сбою приложения в случае ошибки. Вы должны решить, как/если обрабатывать случаи ошибок. Обычно гораздо лучше потерпеть неудачу/сбой как можно раньше, вместо того, чтобы работать с неверными данными
3. Каков идиоматический способ борьбы с
SendError
этим ? Должны ли мы распространять его, если мы пишем приложение, которое зацикливается на вводе данных пользователем и выполняет различную обработку? Или мы должны просто паниковать, так как это означает, что мы написали ошибочный код?4. Ошибка возвращается, когда
Receiver
сообщение было удалено, поэтому получить какие-либо сообщения невозможно. Если это невозможно сделать в вашем приложении, то все в порядке. Но если это возможно, правильный способ обработки ошибки зависит от того, что должно произойти, когда приемника больше нет — универсального решения не существует — это зависит от приложения