Асинхронная загрузка большого количества файлов

#asynchronous #rust #download

Вопрос:

(Исходный код взят из http://patshaughnessy.net/2020/1/20/downloading-100000-files-using-async-rust )

У меня есть некоторый код для одновременной загрузки изображений с использованием Rust, однако я не уверен, как реализовать 2 вещи. Первое — ограничение скорости, чтобы избежать 429-х годов. Я попробовал использовать std::thread::sleep , однако это работает не так, как я ожидал. (То же самое происходит с tokio::time::sleep) (Я не писал исходный код и новичок в асинхронной rust, поэтому я не уверен в особенностях того, как и когда выполняется код.).

 #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let paths: Vec<String> = read_lines("links.txt")?;
    let fetches = futures::stream::iter(
    paths.into_iter().map(|path| {
        async move {
            let a = path.split('/').collect::<Vec<amp;str>>();
            let file_name = a.last().unwrap();
            tokio::time::sleep(tokio::time::Duration::from_secs(1));
            match reqwest::get(amp;path).await {
                Ok(resp) => {
                    if resp.status().as_u16() != 200 {
                        println!("failed to download");
                        println!("{}", path);
                    };
                    match resp.bytes().await {
                        Ok(bytes) => {
                            //println!("RESPONSE: {} bytes from {}", (bytes.len()), path);
                            write(format!("downloads/{}", file_name), bytes).unwrap();
                        }
                        Err(_) => println!("ERROR reading {}", path),
                    }
                }
                Err(_) => println!("ERROR downloading {}", path),
            }
        }
    })
    ).buffer_unordered(200).collect::<Vec<()>>();
    fetches.await;
    Ok(())
}
 

Во-вторых, я хочу иметь список ссылок, которые были «ограничены по скорости» (вернули статус 429), и распечатать их, как только все остальные файлы будут загружены. Моя попытка и полученное в результате сообщение об ошибке компилятора выглядят следующим образом:

 #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut failed: Vec<String> = Vec::new();
    let paths: Vec<String> = read_lines("links.txt")?;
    let fetches = futures::stream::iter(
    paths.into_iter().map(|path| {
        async move {
            let a = path.split('/').collect::<Vec<amp;str>>();
            let file_name = a.last().unwrap();
            match reqwest::get(amp;path).await {
                Ok(resp) => {
                    if resp.status().as_u16() != 200 {
                        println!("failed to download {}", path);
                        failed.push(path);
                        return // To avoid downloading the file, which will not contain what we want.
                    };
                    match resp.bytes().await {
                        Ok(bytes) => {
                            //println!("RESPONSE: {} bytes from {}", (bytes.len()), path);
                            write(format!("downloads/{}", file_name), bytes).unwrap();
                        }
                        Err(_) => println!("ERROR reading {}", path),
                    }
                }
                Err(_) => println!("ERROR downloading {}", path),
            }
        }
    })
    ).buffer_unordered(200).collect::<Vec<()>>();
    fetches.await;
    Ok(())
}
 
 error[E0507]: cannot move out of `failed`, a captured variable in an `FnMut` closure
  --> src/main.rs:28:20
   |
24 |       let mut failed: Vec<String> = Vec::new();
   |           ---------- captured outer variable
...
28 |           async move {
   |  ____________________^
29 | |             let a = path.split('/').collect::<Vec<amp;str>>();
30 | |             let file_name = a.last().unwrap();
31 | |             match reqwest::get(amp;path).await {
...  |
35 | |                         failed.push(path);
   | |                         ------
   | |                         |
   | |                         move occurs because `failed` has type `Vec<String>`, which does not implement the `Copy` trait
   | |                         move occurs due to use in generator
...  |
47 | |             }
48 | |         }
   | |_________^ move out of `failed` occurs here

 

Как я могу это сделать?

Комментарии:

1. Что касается вашего первого вопроса, я не знаю, слишком ли много 200 для этой buffer_unordered функции. Вы пробовали использовать меньшее значение, например 8, которое использовалось в исходном сообщении , на которое вы ссылались.

2. Вы не должны использовать функции блокировки в асинхронной функции. Вместо std::thread::sleep использования tokio::time::sleep