Веб-сервер Rust Hyper/axum с интенсивными вычислениями и высокой пропускной способностью

#rust #async-await #rust-tokio #hyper

Вопрос:

У меня есть случай, когда у меня есть http-сервер, axum который получает полезную нагрузку с очень высокой пропускной способностью (может достигать 20 м в секунду). Мне нужно взять эти байты из запроса и выполнить с ними некоторые сложные вычисления. Проблема в том, что объем памяти непредсказуемо высок (может достигать 5 ГБ). Это текущая настройка того, как я пытаюсь ее достичь:

 #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (tx, mut rx) = mpsc::channel::<WriteRequest>(32);

    tokio::spawn(async move {
      while let Some(payload) = rx.recv().await {
        tokio::task::spawn_blocking(move || {
          // Run heavy computation here...
          heavy_computation(payload)
        }).await;
      }
    });

    // build our application with a route
    let app = Router::new()
        .route("/write", post(move |req: Bytes| async move {
           let data: WriteRequest = Message::decode(req);
           // send data here
           let _ = tx.send(data).await;

           "ok"
         }));

    let addr = ([0, 0, 0, 0], 8080).into();
    let server = axum::Server::bind(amp;addr)
        .serve(app.into_make_service());
    
    if let Err(e) = server.await {
        error!("server error: {}", e);
    }
    
    Ok(())
}
 

Я думаю, что это обратное давление на bounded channel то, что заставляет запросы накапливаться до тех пор, пока они не будут отправлены другому task для обработки, что приводит к увеличению объема памяти.
Потому что даже если бы я попытался заменить heavy_copmutation это на простое sleep , примерно 200ms это закончилось бы теми же результатами.
Если я исключу эту heavy_computation часть, память останется низкой.

Как правильно подойти к такой проблеме? или с такой высокой пропускной способностью здесь ничего нельзя сделать?

Большое спасибо!

Комментарии:

1. Я думаю, что это противодавление на ограниченном канале, из-за которого запросы накапливаются до тех пор, пока их нельзя отправить на другую задачу для обработки . Вы пробовали использовать канал с буферизацией 0? Прямо сейчас он установлен на 32, и канал не будет выполнять никакого обратного давления, пока в очереди не будет 32 сообщения. ( Я предполагаю, что этот канал-tokio::синхронизация::mpsc::канал)

2. Да, это tokio::sync::mpsc::channel не 0 buffered просто неограниченный буфер?

3. Нет, неограниченные каналы явно определены как другой тип , но , похоже, tokio channel не допускает 0 буферизованных каналов, для этого требуется, по крайней мере 1 , попробуйте использовать 1 буферизованный канал

4. Будет сделано! Я должен сказать, однако, что я пытался установить это 2 в прошлом, но безуспешно.

5. Не могли бы вы поделиться дополнительной информацией, чтобы понять это? Что такое «20 м»? Мб данных для каждого запроса или количество запросов в миллионах? Вы уверены, что выделяет не heavy_computation? Насколько он велик req: Bytes ? Общая память должна быть примерно: 32 * (heavy_computation_size req_size) pending_req_count * pending_req_size; Для ограничения pending_req_count ее необходимо каким-то образом настроить в axum.

Ответ №1:

Такое ощущение, что пока вычисление heavy_computation занято, накапливаются миллионы ожидающих запросов. Необходимо ограничить количество принятых подключений/запросов, обрабатываемых одновременно. Чтобы использовать 5 ГБ, требуется всего 25 тыс. ожидающих запросов с полезной нагрузкой 200 Кб, а не миллионы.

аксум основан на башне, а башня основана на гипере.

Известно, что у hyper нет параметра max connections, но люди там предлагают использовать промежуточное программное обеспечение ConcurrencyLimit из tower и настроить его на сервере или создать пользовательский цикл приема/обработки.

Возможно, можно было бы передать это промежуточное программное обеспечение в tower и через axum, но в противном случае, если это вариант для вас, вы могли бы попробовать перейти непосредственно в tower или даже использовать hyper и реализовать его с использованием примитивов, доступных там для этой рабочей нагрузки.

Комментарии:

1. Да, я думаю, что вы правы в своем анализе проблемы. Я попробовал использовать ConcurrencyLimit промежуточное программное обеспечение без какой-либо помощи, я думаю, что буду больше исследовать в этом направлении, большое спасибо!