Почему tokio ::spawn имеет задержку при вызове рядом с crossbeam_channel::select ?

#asynchronous #rust #rust-tokio #crossbeam

#асинхронный #Ржавчина #rust-tokio #crossbeam

Вопрос:

Я создаю задачу, которая будет порождать другие задачи. Некоторые из них займут некоторое время, поэтому их нельзя ожидать, но они могут выполняться параллельно:

src/main.rs

 use crossbeam::crossbeam_channel::{bounded, select};

#[tokio::main]
async fn main() {
    let (s, r) = bounded::<usize>(1);

    tokio::spawn(async move {
        let mut counter = 0;
        loop {
            let loop_id = counter.clone();
            tokio::spawn(async move { // why this one was not fired?
                println!("inner task {}", loop_id);
            }); // .await.unwrap(); - solves issue, but this is long task which cannot be awaited
            println!("loop {}", loop_id);
            select! {
                recv(r) -> rr => {
                    // match rr {
                    //     Ok(ee) => {
                    //         println!("received from channel {}", loop_id);
                    //         tokio::spawn(async move {
                    //             println!("received from channel task {}", loop_id);
                    //         });
                    //     },
                    //     Err(e) => println!("{}", e),
                    // };
                },
                // more recv(some_channel) -> 
            }
            counter = counter   1;
        }
    });

    // let s_clone = s.clone();
    // tokio::spawn(async move {
    //     s_clone.send(2).unwrap();
    // });

    loop {
        // rest of the program
    }
}
  

Я заметил странное поведение. Это выводит:

 loop 0
  

Я ожидал, что он также будет выводиться inner task 0 .

Если я отправлю значение в канал, результат будет:

 loop 0
inner task 0
loop 1
  

Это отсутствует inner task 1 .

Почему inner task порождается с одним циклом задержки?

В первый раз, когда я заметил такое поведение с «полученным из задачи канала», задержка одного цикла, но когда я сократил код для подготовки образца, это начало происходить с «внутренней задачей». Возможно, стоит упомянуть, что если я напишу второе tokio::spawn право другому, только у последнего будет эта проблема. Есть ли что-то, о чем я должен знать при вызове tokio::spawn and select! ? Что вызывает этот один цикл задержки?

Зависимости Cargo.toml

 [dependencies]
tokio = { version = "0.2", features = ["full"] }
crossbeam = "0.7"
  

Rust 1.46, Windows 10

Ответ №1:

select! блокируется, и документы для tokio::spawn say:

Созданная задача может выполняться в текущем потоке или может быть отправлена в другой поток для выполнения.

В этом случае select! «будущее» на самом деле является блокирующей функцией и spawn не использует новый поток (ни при первом вызове, ни внутри цикла). Поскольку вы не говорите tokio, что собираетесь блокировать, tokio не считает, что нужен другой поток (с точки зрения tokio, у вас есть только 3 фьючерса, которые никогда не должны блокироваться, так зачем вам вообще нужен другой поток?).

Решение состоит в том, чтобы использовать tokio::task::spawn_blocking для select! закрытия (которое больше не будет будущим, как async move {} и сейчас move || {} ). Теперь tokio узнает, что эта функция фактически блокируется, и переместит ее в другой поток (сохраняя при этом все фактические фьючерсы в других потоках выполнения).

 use crossbeam::crossbeam_channel::{bounded, select};

#[tokio::main]
async fn main() {
    let (s, r) = bounded::<usize>(1);

    tokio::task::spawn_blocking(move || {
        // ...
    });

    loop {
        // rest of the program
    }
}
  

Ссылка на игровую площадку

Другим возможным решением является использование неблокирующего канала, подобного tokio::sync::mpsc , на котором вы можете использовать await и получать ожидаемое поведение, например, этот пример с игровой площадкой, с помощью direct recv().await или с tokio::select! помощью , как это:

 use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (mut s, mut r) = mpsc::channel::<usize>(1);

    tokio::spawn(async move {
        loop {
            // ...
            tokio::select! {
                Some(i) = r.recv() => {
                    println!("got = {}", i);
                }
            }
        }
    });

    loop {
        // rest of the program
    }
}
  

Ссылка на игровую площадку

Комментарии:

1. Спасибо за отличное объяснение! Это работает, как и ожидалось.