actix-веб-клиент обрабатывает полезную нагрузку как поток

#rust #client #response #payload #actix-web

#Ржавчина #клиент #ответ #полезная нагрузка #actix-web

Вопрос:

Простой http-клиент с actix в rust. Получает ответ от сервера, может обрабатывать заголовок. Но при попытке обработать полезную нагрузку мое понимание rust терпит неудачу. Я хочу обрабатывать полезную нагрузку байт за байтом, чтобы получать огромные потоки и фрагментированные данные. Простое решение body () работает нормально. Но как получить байты из полезной нагрузки. Кажется, что он реализует Stream , поэтому fn size_hint() доступен, но poll_next из Stream с помощью декомпрессора impl Decoder impl Stream нет.

main.rs

 use actix::prelude::*;
use actix_http::encoding::{Decoder};
use actix_web::{HttpMessage,client::ClientBuilder,dev::Payload,http::{Version, StatusCode},web::Bytes};

use futures::future::{TryFutureExt};
use futures_core::stream::Stream;

use std::pin::Pin;
use std::boxed::Box;
use std::task::{Context, Poll};

#[actix_web::main]
async fn main() {
    const URL : [amp;str; 2] = ["..",".."];

    let client = ClientBuilder::new()
        .disable_timeout()
        .disable_redirects()
        .header("User-Agent", "actix-web/3.0")
        .header("Accept-Encoding", "gzip, deflate, br, chunked")
        .header("Connection", "keepalive")
        .header("DNT", "1")
        .max_http_version( Version::HTTP_2 )
        .finish();

    // Create request builder and send request
    let response = 
    client.get(URL[0]).send().map_err(|err|{
        println!("Error on send {:?}",err);
    });

    match response.await{
        Ok( mut v) => {         
            println!("encoding: {:?}",v.encoding());
            println!("content type: {:?}",v.content_type());
            println!("mime type: {:?}",v.mime_type());
            println!("chunked: {:?}",v.chunked());

            match v.status(){
                StatusCode::OK => {
                    /* works fine
                    match v.body().await {
                        Ok(m) => {  println!("body {:?}", m);   },
                        Err(r) => { println!("Error.body {:?}",r); }
                    }
                    */
                    // but what about handling byte by byte?
                    let payload = v.take_payload();

                    match payload {
                        Payload::None => {  println!(" no data ");  },
                        Payload::H1(h1) => {println!(" data h1 {:?}", h1);  },
                        Payload::H2(d2) => {println!(" data h2 ");          },
                        Payload::Stream(s1) => {
                            println!(" data stream ");
                            
                            println!(" size: {:?}", s1.size_hint());
                            // Decompress impl Decoder
                            // Decoder impl Stream
                            
                            // but how to activate poll_next
                            // compiler can size_hint from Stream
                            // but poll_next not found??
                        }
                    };
                },
                StatusCode::NOT_FOUND => {
                    println!(" resource was not found");
                },
                _ => {
                    
                }
            }
        },
        Err(err) => {
            println!("Error {:?}",err);
        }
    };
}
  

Комментарии:

1. попробуйте вызвать все необходимые определения с помощью use actix::prelude::* в начале

2. не было решения

3. Какие ошибки вы получаете?

4. ошибка компилятора: ошибка [E0599]: poll_next для struct actix_http::encoding::Decoder<actix_http::Payload<std::pin::Pin<std::boxed::Box<dyn futures_core::Stream<Item = std::result::Result<actix_web::web::Bytes, actix_http::error::PayloadError>>>>>> в текущей области не найден метод с именем

5. вместо http-клиента crate actix-web я использую http-клиент crate hyper, который позволяет выполнять чтение по частям и дает мне больше и проще контролировать процесс чтения. Как бы мало ни было неудобств, мне приходится самостоятельно распаковывать данные, с помощью асинхронного сжатия crate это просто, brotli, gzip, bzip2 поддерживаются по умолчанию.

Ответ №1:

Payload реализует Stream , поэтому его можно перебирать по частям.

Каждый фрагмент будет a Result<Bytes, PayloadError> .

 // use futures::StreamExt;
let mut payload = v.take_payload();
while let Some(chunk) = payload.next().await {
    // handle chunk here
}