Не удается заставить асинхронное закрытие работать с Warp :: Filter

#rust #closures #rust-warp

#Ржавчина #замыкания #rust-деформация

Вопрос:

Я пытаюсь заставить асинхронное закрытие работать в and_then фильтре Warp .

Это самый маленький пример, который я мог придумать, где я достаточно уверен, что не упустил никаких важных деталей:

 use std::{convert::Infallible, sync::Arc, thread, time};
use tokio::sync::RwLock;
use warp::Filter;

fn main() {
    let man = Manifest::new();

    let check = warp::path("updates").and_then(|| async move { GetAvailableBinaries(amp;man).await });
}

async fn GetAvailableBinaries(man: amp;Manifest) -> Result<impl warp::Reply, Infallible> {
    Ok(warp::reply::json(amp;man.GetAvailableBinaries().await))
}

pub struct Manifest {
    binaries: Arc<RwLock<Vec<i32>>>,
}

impl Manifest {
    pub fn new() -> Manifest {
        let bins = Arc::new(RwLock::new(Vec::new()));

        thread::spawn(move || async move {
            loop {
                thread::sleep(time::Duration::from_millis(10000));
            }
        });

        Manifest { binaries: bins }
    }

    pub async fn GetAvailableBinaries(amp;self) -> Vec<i32> {
        self.binaries.read().await.to_vec()
    }
}
  

Я использую:

 [dependencies]
tokio = { version = "0.2", features = ["full"] }
warp = { version = "0.2", features = ["tls"] }
  

Ошибка:

 error[E0525]: expected a closure that implements the `Fn` trait, but this closure only implements `FnOnce`
 --> src/main.rs:9:48
  |
9 |     let check = warp::path("updates").and_then(|| async move { GetAvailableBinaries(amp;man).await });
  |                                       -------- ^^^^^^^^^^^^^ ------------------------------------ closure is `FnOnce` because it moves the variable `man` out of its environment
  |                                       |        |
  |                                       |        this closure implements `FnOnce`, not `Fn`
  |                                       the requirement to implement `Fn` derives from here
  

Комментарии:

1. Кстати, идиоматический Rust используется snake_case для переменных, методов, макросов, полей и модулей; UpperCamelCase для типов и вариантов перечисления; а SCREAMING_SNAKE_CASE также для статики и констант.

2. Код в вопросе не реализуется Clone для манифеста, но вы удалили эту ошибку.

Ответ №1:

После создания Manifest реализации Clone вы можете исправить ошибку, выполнив балансировку при клонировании объекта манифеста:

 fn main() {
    let man = Manifest::new();

    let check = warp::path("updates").and_then(move || {
        let man = man.clone();
        async move { get_available_binaries(amp;man).await }
    });

    warp::serve(check);
}
  

Это перемещается man в переданное закрытие and_then , а затем предоставляет клон man асинхронному блоку при каждом выполнении закрытия. Затем асинхронный блок владеет этими данными и может ссылаться на них, не беспокоясь о выполнении future после освобождения данных.

Комментарии:

1. К сожалению, это не работает. Это приводит use of moved value: bins к последней строке impl Manifest блока

2. @MatthewGoulart не используется bins in GetAvailableBinaries , который является концом блока impl .

3. Извините, ошибка возникает в последней строке new() fn impl Manifest блока

4.Теперь я вижу, это потому, что в моем реальном коде new используется поток, созданный в. bins В основном поэтому я не решался многое убрать из моего первоначального вопроса, я недостаточно хорошо знаю rust, чтобы знать, что важно, а что нет, особенно когда я не знаю, почему возникает ошибка.

5. @MatthewGoulart вы не экстраполируете полученные ответы. Вам необходимо клонировать bins , прежде чем передавать его в созданный поток. Это та же проблема, но в другом месте.

Ответ №2:

Я не уверен, что это то, к чему вы стремитесь, но это решение подходит для меня:

 use std::{convert::Infallible, sync::Arc, thread, time};
use tokio::sync::RwLock;
use warp::Filter;

fn main() {
    let man = Manifest::new();

    let check = warp::path("updates").and_then(|| async { GetAvailableBinaries(amp;man).await });
}

async fn GetAvailableBinaries(man: amp;Manifest) -> Result<impl warp::Reply, Infallible> {
    Ok(warp::reply::json(amp;man.GetAvailableBinaries().await))
}

#[derive(Clone)]
pub struct Manifest {
    binaries: Arc<RwLock<Vec<i32>>>,
}

impl Manifest {
    pub fn new() -> Manifest {
        let bins = Arc::new(RwLock::new(Vec::new()));

        thread::spawn(move || async {
            loop {
                thread::sleep(time::Duration::from_millis(10000));
                //mutate bins here
            }
        });

        Manifest { binaries: bins }
    }

    pub async fn GetAvailableBinaries(amp;self) -> Vec<i32> {
        self.binaries.read().await.to_vec()
    }
}
  

move Вот причина, по которой компилятор выдал предупреждение относительно подписи : let check = warp::path("updates").and_then(|| async move { GetAvailableBinaries(amp;man).await }); . Это означает, что все, на что ссылается это закрытие, будет перенесено в контекст закрытия. В этом случае компилятор не может гарантировать, что закрытие будет Fn выполнено, а FnOnce означает, что закрытие может быть гарантировано только один раз.

Комментарии:

1. Ну, это технически позволяет компилировать код, на самом деле это не решает реальную проблему. В реальном коде, скорее всего, будет вызов warp::serve(check) для запуска веб-сервера. Этот вызов наложит дополнительные ограничения на check