Цикл, обнаруженный в рекурсивной асинхронной функции

#recursion #rust #async-await

Вопрос:

Я пытаюсь рекурсивно находить файлы с расширением в директории, и вот моя текущая реализация:

 use futures::future::BoxFuture;
use futures::Stream;
use std::io::ErrorKind;
use std::pin::Pin;
use std::resu<
use tokio::fs::read_dir;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;

type Result<T> = result::Result<T, std::io::Error>;
type FileNameStream = Pin<Box<dyn Stream<Item = Result<String>>   Send   Sync   'static>>;

async fn list_all(root_path: String, ext: String) -> Result<FileNameStream> {
    async fn one_level(path: String, tx: Sender<Result<String>>, ext: String) -> Result<()> {
        let mut dir = read_dir(path).await?;
        let mut files: Vec<String> = Vec::new();

        while let Some(child) = dir.next_entry().await? {
            if let Some(child_path) = child.path().to_str() {
                if child.metadata().await?.is_dir() {
                    tokio::spawn(async {
                        one_level(child_path.to_string(), tx.clone(), ext.clone()).await;
                    });
                } else {
                    if child_path.ends_with(amp;ext.clone()) {
                        files.push(child_path.to_string())
                    }
                }
            } else {
                tx.send(Err(std::io::Error::new(
                    ErrorKind::Other,
                    "Invalid path".to_string(),
                )));
            }
        }

        for file in files {
            tx.send(Ok(file));
        }
        Ok(())
    }

    let (tx, rx): (Sender<Result<String>>, Receiver<Result<String>>) = channel(2);
    tokio::spawn(async {
        one_level(root_path, tx, ext).await;
    });
    Ok(Box::pin(ReceiverStream::new(rx)))
}
 

Я не совсем понимаю, почему компилятор жалуется:

 14 |     async fn one_level(path: String, tx: Sender<Result<String>>, ext: String) -> Result<()> {
   |                                                                                  ^^^^^^^^^^
   |
note: ...which requires borrow-checking `list_all::{closure#0}::one_level`...
  --> src/main.rs:14:5
.....
.....
   = note: ...which requires evaluating trait selection obligation `for<'r, 's, 't0> {std::future::ResumeTy, amp;'r str, std::string::String, amp;'s tokio::sync::mpsc::Sender<std::result::Result<std::string::String, std::io::Error>>, tokio::sync::mpsc::Sender<std::result::Result<std::string::String, std::io::Error>>, amp;'t0 std::string::String, impl futures::Future, ()}: std::marker::Send`...
   = note: ...which again requires computing type of `list_all::{closure#0}::one_level::{opaque#0}`, completing the cycle
   = note: cycle used when evaluating trait selection obligation `{std::future::ResumeTy, std::string::String, tokio::sync::mpsc::Sender<std::result::Result<std::string::String, std::io::Error>>, impl futures::Future, ()}: std::marker::Send`
 

Можно ли определить такую рекурсивную функцию как асинхронную? Могу ли я параллельно выполнить процедуру перечисления dir tokio::spwan и ускорить ее?

Комментарии:

1. rust-lang.github.io/async-book/07_workarounds/04_recursion.html

Ответ №1:

Асинхронные функции rust компилируются в конечные машины, поэтому для вызова асинхронной функции потребуется, чтобы конечная машина встраивалась в свое собственное определение, что было бы бесконечной рекурсией.

Это лучше объясняется здесь. Обходной путь, как описано в уже связанном документе, заключается в том, чтобы ввести косвенное использование Box ( BoxFuture тип) и несинхронную функцию:

 use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
use std::io::ErrorKind;
use std::pin::Pin;
use std::resu<
use tokio::fs::read_dir;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;

type Result<T> = result::Result<T, std::io::Error>;
type FileNameStream = Pin<Box<dyn Stream<Item = Result<String>>   Send   Sync   'static>>;

async fn list_all(root_path: String, ext: String) -> Result<FileNameStream> {
    let (tx, rx): (Sender<Result<String>>, Receiver<Result<String>>) = channel(2);
    tokio::spawn(async {
        recursive(root_path, tx, ext).await.unwrap();
    });
    Ok(Box::pin(ReceiverStream::new(rx)))
}

fn recursive(
    path: String,
    tx: Sender<Result<String>>,
    ext: String,
) -> BoxFuture<'static, Result<()>> {
    async move {
        let mut dir = read_dir(path).await?;
        let mut files: Vec<String> = Vec::new();

        while let Some(child) = dir.next_entry().await? {
            match child.path().to_str() {
                Some(child_path) => {
                    let metadata = child.metadata().await?;

                    if metadata.is_dir() {
                        let cp = child_path.to_owned();
                        let tx = tx.clone();
                        let ext = ext.clone();

                        tokio::spawn(async {
                            recursive(cp, tx, ext).await.unwrap();
                        });
                    } else {
                        if child_path.ends_with(amp;ext) {
                            files.push(child_path.to_owned())
                        }
                    }
                }
                None => {
                    tx.send(Err(std::io::Error::new(
                        ErrorKind::Other,
                        "Invalid path".to_string(),
                    )))
                    .await
                    .unwrap();
                }
            }
        }

        for file in files {
            tx.send(Ok(file)).await.unwrap();
        }
        Ok(())
    }
    .boxed()
}
 

PS: Я также исправил некоторые другие проблемы с вашим кодом, такие как пропущенные await tx.send() вызовы — помните — фьючерсы выполняют работу только тогда, когда они опрошены!!!

Комментарии:

1. Спасибо за ответ! Не могли бы вы, пожалуйста, объяснить, почему unwrap в результате внутри tokio::spawn и tx.send(Ok).await ? будут ли они просто паниковать по всей программе или только по потоку/исполнителю, который ее запускает? Можно ли также отлавливать и распространять ошибки и отправлять их на канал?

2. Их разворачивание вызовет панику и, таким образом, приведет к сбою приложения в случае ошибки. Вы должны решить, как/если обрабатывать случаи ошибок. Обычно гораздо лучше потерпеть неудачу/сбой как можно раньше, вместо того, чтобы работать с неверными данными

3. Каков идиоматический способ борьбы с SendError этим ? Должны ли мы распространять его, если мы пишем приложение, которое зацикливается на вводе данных пользователем и выполняет различную обработку? Или мы должны просто паниковать, так как это означает, что мы написали ошибочный код?

4. Ошибка возвращается, когда Receiver сообщение было удалено, поэтому получить какие-либо сообщения невозможно. Если это невозможно сделать в вашем приложении, то все в порядке. Но если это возможно, правильный способ обработки ошибки зависит от того, что должно произойти, когда приемника больше нет — универсального решения не существует — это зависит от приложения