#rust #async-await #rust-tokio #hyper
Вопрос:
У меня есть случай, когда у меня есть http-сервер, axum
который получает полезную нагрузку с очень высокой пропускной способностью (может достигать 20 м в секунду). Мне нужно взять эти байты из запроса и выполнить с ними некоторые сложные вычисления. Проблема в том, что объем памяти непредсказуемо высок (может достигать 5 ГБ). Это текущая настройка того, как я пытаюсь ее достичь:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, mut rx) = mpsc::channel::<WriteRequest>(32);
tokio::spawn(async move {
while let Some(payload) = rx.recv().await {
tokio::task::spawn_blocking(move || {
// Run heavy computation here...
heavy_computation(payload)
}).await;
}
});
// build our application with a route
let app = Router::new()
.route("/write", post(move |req: Bytes| async move {
let data: WriteRequest = Message::decode(req);
// send data here
let _ = tx.send(data).await;
"ok"
}));
let addr = ([0, 0, 0, 0], 8080).into();
let server = axum::Server::bind(amp;addr)
.serve(app.into_make_service());
if let Err(e) = server.await {
error!("server error: {}", e);
}
Ok(())
}
Я думаю, что это обратное давление на bounded channel
то, что заставляет запросы накапливаться до тех пор, пока они не будут отправлены другому task
для обработки, что приводит к увеличению объема памяти.
Потому что даже если бы я попытался заменить heavy_copmutation
это на простое sleep
, примерно 200ms
это закончилось бы теми же результатами.
Если я исключу эту heavy_computation
часть, память останется низкой.
Как правильно подойти к такой проблеме? или с такой высокой пропускной способностью здесь ничего нельзя сделать?
Большое спасибо!
Комментарии:
1. Я думаю, что это противодавление на ограниченном канале, из-за которого запросы накапливаются до тех пор, пока их нельзя отправить на другую задачу для обработки . Вы пробовали использовать канал с буферизацией 0? Прямо сейчас он установлен на 32, и канал не будет выполнять никакого обратного давления, пока в очереди не будет 32 сообщения. ( Я предполагаю, что этот канал-tokio::синхронизация::mpsc::канал)
2. Да, это
tokio::sync::mpsc::channel
не0 buffered
просто неограниченный буфер?3. Нет, неограниченные каналы явно определены как другой тип , но , похоже, tokio channel не допускает
0
буферизованных каналов, для этого требуется, по крайней мере1
, попробуйте использовать 1 буферизованный канал4. Будет сделано! Я должен сказать, однако, что я пытался установить это
2
в прошлом, но безуспешно.5. Не могли бы вы поделиться дополнительной информацией, чтобы понять это? Что такое «20 м»? Мб данных для каждого запроса или количество запросов в миллионах? Вы уверены, что выделяет не heavy_computation? Насколько он велик
req: Bytes
? Общая память должна быть примерно: 32 * (heavy_computation_size req_size) pending_req_count * pending_req_size; Для ограничения pending_req_count ее необходимо каким-то образом настроить в axum.
Ответ №1:
Такое ощущение, что пока вычисление heavy_computation занято, накапливаются миллионы ожидающих запросов. Необходимо ограничить количество принятых подключений/запросов, обрабатываемых одновременно. Чтобы использовать 5 ГБ, требуется всего 25 тыс. ожидающих запросов с полезной нагрузкой 200 Кб, а не миллионы.
аксум основан на башне, а башня основана на гипере.
Известно, что у hyper нет параметра max connections, но люди там предлагают использовать промежуточное программное обеспечение ConcurrencyLimit из tower и настроить его на сервере или создать пользовательский цикл приема/обработки.
Возможно, можно было бы передать это промежуточное программное обеспечение в tower и через axum, но в противном случае, если это вариант для вас, вы могли бы попробовать перейти непосредственно в tower или даже использовать hyper и реализовать его с использованием примитивов, доступных там для этой рабочей нагрузки.
Комментарии:
1. Да, я думаю, что вы правы в своем анализе проблемы. Я попробовал использовать
ConcurrencyLimit
промежуточное программное обеспечение без какой-либо помощи, я думаю, что буду больше исследовать в этом направлении, большое спасибо!