Обработка повторяющихся вставок в базу данных в асинхронном режиме

#rust #actix-web #rust-actix #rust-sqlx

Вопрос:

Новичок как в rust, так и в асинхронном программировании здесь.

У меня есть функция, которая загружает и хранит кучу твитов в базе данных:

 pub async fn process_user_timeline(config: amp;Settings, pool: amp;PgPool, user_object: amp;Value) {
    // get timeline
    if let Ok((user_timeline, _)) =
        get_user_timeline(config, user_object["id"].as_str().unwrap()).await
    {
        // store tweets
        if let Some(tweets) = user_timeline["data"].as_array() {
            for tweet in tweets.iter() {
                store_tweet(pool, amp;tweet, amp;user_timeline, "normal")
                    .await
                    .unwrap_or_else(|e| {
                        println!(
                            ">>>X>>> failed to store tweet {}: {:?}",
                            tweet["id"].as_str().unwrap(),
                            e
                        )
                    });
            }
        }
    }
}

 

Он вызывается в асинхронном цикле другой функцией:

 pub async fn loop_until_hit_rate_limit<'a, T, Fut>(
    object_arr: amp;'a [T],
    settings: amp;'a Settings,
    pool: amp;'a PgPool,
    f: impl Fn(amp;'a Settings, amp;'a PgPool, amp;'a T) -> Fut   Copy,
    rate_limit: usize,
) where
    Fut: Future,
{
    let total = object_arr.len();
    let capped_total = min(total, rate_limit);

    let mut futs = vec![];
    for (i, object) in object_arr[..capped_total].iter().enumerate() {
        futs.push(async move {
            println!(">>> PROCESSING {}/{}", i   1, total);
            f(settings, pool, object).await;
        });
    }
    futures::future::join_all(futs).await;
}
 

Иногда две асинхронные задачи пытаются вставить один и тот же твит одновременно, что приводит к этой ошибке:

 failed to store tweet 1398307091442409475: Database(PgDatabaseError { severity: Error, code: "23505", message: "duplicate key value violates unique constraint "tweets_tweet_id_key"", detail: Some("Key (tweet_id)=(1398307091442409475) already exists."), hint: None, position: None, where: None, schema: Some("public"), table: Some("tweets"), column: None, data_type: None, constraint: Some("tweets_tweet_id_key"), file: Some("nbtinsert.c"), line: Some(656), routine: Some("_bt_check_unique") })
 

Имейте в виду, что код уже проверяет наличие твита перед его вставкой, поэтому это происходит только в следующем сценарии: ЧТЕНИЕ из задачи 1 > ЧТЕНИЕ из задачи 2 >> ЗАПИСЬ из задачи 1 (успех) >>> ЗАПИСЬ из задачи 2 (ошибка).

Чтобы решить эту проблему, моей лучшей попыткой до сих пор было разместить unwrap_or_else() предложение, которое позволяет выполнить одну из задач без паники во время всего выполнения. Я знаю, по крайней мере, об одном недостатке — иногда обе задачи выходят из-под контроля, и твит никогда не пишется. Это происходит в

Есть ли другие недостатки в моем подходе, о которых я не знаю?

Как правильно с этим справиться? Я ненавижу терять данные и, что еще хуже, делать это недетерминированно.

PS Я использую actix web и sqlx в качестве своих библиотек веб-сервера / бд.

Комментарии:

1. Похоже, вам следует изменить свою логику SQL для использования ON CONFLICT . Я бы счел это антипаттером, чтобы проверить, существует ли что-то с одним запросом, а затем вставить его с другим запросом именно по той причине, с которой вы столкнулись.

2. Это похоже на случай «я бы предпочел не иметь этой проблемы с самого начала». Почему задача 1 и задача 2 указывают на один и тот же твит?

3. «ЧТЕНИЕ из задачи 1 > ЧТЕНИЕ из задачи 2 >> ЗАПИСЬ из задачи 1 (успех) >>> ЗАПИСЬ из задачи 2 (ошибка)». придира: INSERT в частности. С UPDATE помощью вы сможете заблокировать строку во время чтения, гарантируя, что вторая задача не имеет доступа.

4. вау, каждый день узнаешь что-то новое.. Я понятия не имел об ON CONFLICT этом . Я переписал свой код, и теперь он и правильный, и более быстрый. Огромное спасибо @loganfsmyth. Хотите опубликовать ответ, и я отмечу его правильно?

Ответ №1:

Как правило, для всего, что может быть написано несколькими потоками/процессами, любая логика, такая как

 if (!exists) {
  writeValue()
}
 

необходимо либо защитить каким-то замком, либо изменить код для записи атомарно с возможностью сбоя записи, потому что в него уже записано что-то другое.

Для данных в памяти в Rust вы бы использовали Mutex , чтобы убедиться, что вы можете прочитать, а затем записать данные обратно, прежде чем что-либо еще их прочитает, или Atomic изменить данные таким образом, чтобы, если что-то уже записало их, вы могли это обнаружить.

В базах данных для любого запроса, который может конфликтовать с каким-либо другим запросом, выполняемым примерно в то же время, вы хотели бы использовать ON CONFLICT предложение в своем запросе, чтобы сама база данных знала, что делать, когда она пытается записать данные, и она уже существует.

В вашем случае, поскольку я предполагаю, что твиты неизменяемы, вы, вероятно, захотите это сделать ON CONFLICT tweet_id DO NOTHING (или независимо от того, какой у вас столбец идентификатора), и в этом случае INSERT вставка будет пропущена, если уже есть твит с идентификатором, который вы вставляете, и он не выдаст ошибку.