#rust #actix-web #rust-actix #rust-sqlx
Вопрос:
Новичок как в rust, так и в асинхронном программировании здесь.
У меня есть функция, которая загружает и хранит кучу твитов в базе данных:
pub async fn process_user_timeline(config: amp;Settings, pool: amp;PgPool, user_object: amp;Value) {
// get timeline
if let Ok((user_timeline, _)) =
get_user_timeline(config, user_object["id"].as_str().unwrap()).await
{
// store tweets
if let Some(tweets) = user_timeline["data"].as_array() {
for tweet in tweets.iter() {
store_tweet(pool, amp;tweet, amp;user_timeline, "normal")
.await
.unwrap_or_else(|e| {
println!(
">>>X>>> failed to store tweet {}: {:?}",
tweet["id"].as_str().unwrap(),
e
)
});
}
}
}
}
Он вызывается в асинхронном цикле другой функцией:
pub async fn loop_until_hit_rate_limit<'a, T, Fut>(
object_arr: amp;'a [T],
settings: amp;'a Settings,
pool: amp;'a PgPool,
f: impl Fn(amp;'a Settings, amp;'a PgPool, amp;'a T) -> Fut Copy,
rate_limit: usize,
) where
Fut: Future,
{
let total = object_arr.len();
let capped_total = min(total, rate_limit);
let mut futs = vec![];
for (i, object) in object_arr[..capped_total].iter().enumerate() {
futs.push(async move {
println!(">>> PROCESSING {}/{}", i 1, total);
f(settings, pool, object).await;
});
}
futures::future::join_all(futs).await;
}
Иногда две асинхронные задачи пытаются вставить один и тот же твит одновременно, что приводит к этой ошибке:
failed to store tweet 1398307091442409475: Database(PgDatabaseError { severity: Error, code: "23505", message: "duplicate key value violates unique constraint "tweets_tweet_id_key"", detail: Some("Key (tweet_id)=(1398307091442409475) already exists."), hint: None, position: None, where: None, schema: Some("public"), table: Some("tweets"), column: None, data_type: None, constraint: Some("tweets_tweet_id_key"), file: Some("nbtinsert.c"), line: Some(656), routine: Some("_bt_check_unique") })
Имейте в виду, что код уже проверяет наличие твита перед его вставкой, поэтому это происходит только в следующем сценарии: ЧТЕНИЕ из задачи 1 > ЧТЕНИЕ из задачи 2 >> ЗАПИСЬ из задачи 1 (успех) >>> ЗАПИСЬ из задачи 2 (ошибка).
Чтобы решить эту проблему, моей лучшей попыткой до сих пор было разместить unwrap_or_else()
предложение, которое позволяет выполнить одну из задач без паники во время всего выполнения. Я знаю, по крайней мере, об одном недостатке — иногда обе задачи выходят из-под контроля, и твит никогда не пишется. Это происходит в
Есть ли другие недостатки в моем подходе, о которых я не знаю?
Как правильно с этим справиться? Я ненавижу терять данные и, что еще хуже, делать это недетерминированно.
PS Я использую actix web
и sqlx
в качестве своих библиотек веб-сервера / бд.
Комментарии:
1. Похоже, вам следует изменить свою логику SQL для использования
ON CONFLICT
. Я бы счел это антипаттером, чтобы проверить, существует ли что-то с одним запросом, а затем вставить его с другим запросом именно по той причине, с которой вы столкнулись.2. Это похоже на случай «я бы предпочел не иметь этой проблемы с самого начала». Почему задача 1 и задача 2 указывают на один и тот же твит?
3. «ЧТЕНИЕ из задачи 1 > ЧТЕНИЕ из задачи 2 >> ЗАПИСЬ из задачи 1 (успех) >>> ЗАПИСЬ из задачи 2 (ошибка)». придира:
INSERT
в частности. СUPDATE
помощью вы сможете заблокировать строку во время чтения, гарантируя, что вторая задача не имеет доступа.4. вау, каждый день узнаешь что-то новое.. Я понятия не имел об
ON CONFLICT
этом . Я переписал свой код, и теперь он и правильный, и более быстрый. Огромное спасибо @loganfsmyth. Хотите опубликовать ответ, и я отмечу его правильно?
Ответ №1:
Как правило, для всего, что может быть написано несколькими потоками/процессами, любая логика, такая как
if (!exists) {
writeValue()
}
необходимо либо защитить каким-то замком, либо изменить код для записи атомарно с возможностью сбоя записи, потому что в него уже записано что-то другое.
Для данных в памяти в Rust вы бы использовали Mutex
, чтобы убедиться, что вы можете прочитать, а затем записать данные обратно, прежде чем что-либо еще их прочитает, или Atomic
изменить данные таким образом, чтобы, если что-то уже записало их, вы могли это обнаружить.
В базах данных для любого запроса, который может конфликтовать с каким-либо другим запросом, выполняемым примерно в то же время, вы хотели бы использовать ON CONFLICT
предложение в своем запросе, чтобы сама база данных знала, что делать, когда она пытается записать данные, и она уже существует.
В вашем случае, поскольку я предполагаю, что твиты неизменяемы, вы, вероятно, захотите это сделать ON CONFLICT tweet_id DO NOTHING
(или независимо от того, какой у вас столбец идентификатора), и в этом случае INSERT
вставка будет пропущена, если уже есть твит с идентификатором, который вы вставляете, и он не выдаст ошибку.