#mongodb #rust #transactions #rust-tokio #rust-futures
Вопрос:
Я новичок в Rust и использую драйвер MongoDB по умолчанию https://docs.rs/mongodb/2.0.0/mongodb/
Я помню, когда кодировал с Node.js, появилась возможность отправлять транзакции с некоторым обещанием.all (), чтобы выполнить все транзакции одновременно в целях оптимизации, и, если ошибок нет, совершить фиксацию транзакции. (Node.js пример здесь: https://medium.com/@alkor_shikyaro/transactions-and-promises-in-node-js-ca5a3aeb6b74)
Я пытаюсь реализовать ту же логику в Rust сейчас, используя try_join! но я всегда против этой проблемы:
ошибка: невозможно заимствовать
session
как изменяемое более одного раза за раз; метка: здесь происходит первое изменяемое заимствование
use mongodb::{bson::oid::ObjectId, Client, Database, options};
use async_graphql::{
validators::{Email, StringMaxLength, StringMinLength},
Context, ErrorExtensions, Object, Result,
};
use futures::try_join;
//use tokio::try_join; -> same thing
#[derive(Default)]
pub struct UserMutations;
#[Object]
impl UserMutations {
async fn user_followed<'ctx>(
amp;self,
ctx: amp;Context<'ctx>,
other_user_id: ObjectId,
current_user_id: ObjectId,
) -> Result<bool> {
let mut session = Client::with_uri_str(dotenv!("URI"))
.await
.expect("DB not accessible!")
.start_session(Some(session_options))
.await?;
session.start_transaction(Some(options::TransactionOptions::builder()
.read_concern(Some(options::ReadConcern::majority()))
.write_concern(Some(
options::WriteConcern::builder()
.w(Some(options::Acknowledgment::Majority))
.w_timeout(Some(Duration::new(3, 0)))
.journal(Some(false))
.build(),
))
.selection_criteria(Some(options::SelectionCriteria::ReadPreference(
options::ReadPreference::Primary
)))
.max_commit_time(Some(Duration::new(3, 0)))
.build())).await?;
let db = Client::with_uri_str(dotenv!("URI"))
.await
.expect("DB not accessible!").database("database").collection::<Document>("collection");
try_join!(
db.update_one_with_session(
doc! {
"_id": other_user_id
},
doc! {
"$inc": { "following_number": -1 }
},
None,
amp;mut session,
),
db.update_one_with_session(
doc! {
"_id": current_user_id
},
doc! {
"$inc": { "followers_number": -1 }
},
None,
amp;mut session,
)
)?;
Ok(true)
}
}
849 | | amp;mut session,
| | ------------ first mutable borrow occurs here
... |
859 | | amp;mut session,
| | ^^^^^^^^^^^^ second mutable borrow occurs here
860 | | )
861 | | )?;
| |_____________- first borrow later captured here by closure
Есть ли какой-либо способ синхронизировать функции транзакций, чтобы не терять время на независимые мутации? У кого-нибудь есть какие-нибудь идеи?
Заранее спасибо!
Комментарии:
1. Вероятно, вам следует рассмотреть возможность отправки проблемы для этого варианта использования в репо rust mongodb. Если
update_one_with_session
требуется изменяемый доступ кsession
, то нет возможности совершать два вызова одновременно.2. Вместе с предыдущим комментарием; не можете ли вы обновить-многие используют
_id
В [y, z], учитывая$inc
, что операция одинакова для обоих запросов — это было бы быстрее, чем два параллельных фьючерса (если бы вы могли заставить это работать).3. @MartinGallagher да, именно в этом примере это решило бы проблему, но иногда мне нужно выполнять вызовы синхронизации в разные коллекции. В этом случае даже bulkWrite не будет работать (если это будет реализовано… jira.mongodb.org/browse/RUST-531 ), потому что все это зависит от коллекции…
Ответ №1:
Спасибо, Патрик и Зеппи за ваши ответы, я провел еще несколько исследований по этой теме, а также провел собственное тестирование. Итак, давайте начнем.
Во-первых, моим желанием было максимально оптимизировать транзакционную запись, поскольку я хотел получить полную возможность отката, требуемую логикой кода.
В случае, если вы пропустили мои комментарии Патрику, я повторю их здесь, чтобы лучше отразить то, что я думал об этом:
Я понимаю, почему это было бы ограничением для нескольких операций чтения, но если все действия выполняются в отдельных коллекциях (или являются независимыми атомарными записями в несколько документов с разной полезной нагрузкой), Я не понимаю, почему невозможно сохранить случайную согласованность при одновременном их выполнении. Такого рода транзакции никогда не должны создавать условия гонки / конфликты / странное поведение блокировки, и в случае ошибки вся транзакция откатывается до ее совершения в любом случае.
Проводя аналогию с Git (что может быть неверно), при обновлении отдельных файлов / папок конфликты слияния не возникают. Извините за дотошность, но это просто звучит как серьезная возможность повысить скорость.
Но после поисков я был против этой документации: https://github.com/mongodb/specifications/blob/master/source/sessions/driver-sessions.rst#why-does-a-network-error-cause-the-serversession-to-be-discarded-from-the-pool
В противном случае несвязанная операция, которая просто случайно использует тот же сеанс сервера, потенциально заблокирует ожидание завершения предыдущей операции. Например, транзакционная запись заблокирует последующую транзакционную запись.
В принципе, это означает, что даже если вы будете отправлять записи транзакций одновременно, вы не получите большой эффективности, потому что MongoDB сам по себе является блокировщиком. Я решил проверить, так ли это, и, поскольку настройка драйвера NodeJS позволяет отправлять транзакции одновременно (согласно: https://medium.com/@alkor_shikyaro/transactions-and-promises-in-node-js-ca5a3aeb6b74) Я быстро настроил NodeJS, указав на ту же базу данных, размещенную в Atlas на бесплатном уровне.
Во-вторых, статистика и код: это мутация NodeJS, которую я буду использовать для тестов (каждый тест имеет 4 транзакционных записи). Я включил трассировку GraphQL для проверки этого, и вот результаты моих тестов…
export const testMutFollowUser = async (_parent, _args, _context, _info) => {
try {
const { user, dbClient } = _context;
isLoggedIn(user);
const { _id } = _args;
const session = dbClient.startSession();
const db = dbClient.db("DB");
await verifyObjectId().required().validateAsync(_id);
//making sure asked user exists
const otherUser = await db.collection("users").findOne(
{ _id: _id },
{
projection: { _id: 1 }
});
if (!otherUser)
throw new Error("User was not found");
const transactionResult = session.withTransaction(async () => {
//-----using this part when doing concurrency test------
await Promise.all([
await createObjectIdLink({ db_name: 'links', from: user._id, to: _id, db }),
await db.collection('users').updateOne(
{ _id: user._id },
{ $inc: { following_number: 1 } },
),
await db.collection('users').updateOne(
{ _id },
{
$inc: { followers_number: 1, unread_notifications_number: 1 }
},
),
await createNotification({
action: 'USER_FOLLOWED',
to: _id
}, _context)
]);
//-----------end of concurrency part--------------------
//------using this part when doing sync test--------
//this as a helper for db.insertOne(...)
const insertedId = await createObjectIdLink({ db_name: 'links', from: user._id, to: _id, db });
const updDocMe = await db.collection('users').updateOne(
{ _id: user._id },
{ $inc: { following_number: 1 } },
);
const updDocOther = await db.collection('users').updateOne(
{ _id },
{
$inc: { followers_number: 1, unread_notifications_number: 1 }
},
);
//this as another helper for db.insertOne(...)
await createNotification({
action: 'USER_FOLLOWED',
to: _id
}, _context);
//-----------end of sync part---------------------------
return true;
}, transactionOptions);
if (transactionResult) {
console.log("The reservation was successfully created.");
} else {
console.log("The transaction was intentionally aborted.");
}
await session.endSession();
return true;
}
И соответствующие результаты работы:
format:
Request/Mutation/Response = Total (all in ms)
1) For sync writes in the transaction:
4/91/32 = 127
4/77/30 = 111
7/71/7 = 85
6/66/8 = 80
2/74/9 = 85
4/70/8 = 82
4/70/11 = 85
--waiting more time (~10secs)
9/73/34 = 116
totals/8 = **96.375 ms in average**
//---------------------------------
2) For concurrent writes in transaction:
3/85/7 = 95
2/81/14 = 97
2/70/10 = 82
5/81/11 = 97
5/73/15 = 93
2/82/27 = 111
5/69/7 = 81
--waiting more time (~10secs)
6/80/32 = 118
totals/8 = ** 96.75 ms ms in average **
Вывод: разница между ними находится в пределах погрешности (но все еще на стороне синхронизации).
Я предполагаю, что при синхронизации вы тратите время на ожидание запроса/ответа БД, в то время как одновременно вы ждете, когда MongoDB закажет запросы, а затем выполнит их все, что в конце дня будет стоить столько же времени.
Таким образом, с учетом текущей политики MongoDB, я полагаю, ответ на мой вопрос будет следующим: «нет необходимости в параллелизме, потому что это в любом случае не повлияет на производительность». Однако было бы невероятно, если бы MongoDB позволил распараллеливать записи в транзакциях в будущих выпусках с блокировками на уровне документа (по крайней мере, для движка WiredTiger) вместо уровня базы данных, как это происходит в настоящее время для транзакций (потому что вы ждете завершения всей записи до следующего).
Не стесняйтесь поправлять меня, если я что-то пропустил/неправильно истолковал. Спасибо!
Ответ №2:
Это ограничение на самом деле предусмотрено специально. В MongoDB клиентские сеансы не могут использоваться одновременно (см. Здесь и здесь), поэтому драйвер Rust принимает их, amp;mut
чтобы предотвратить это во время компиляции. Пример узла работает только случайно и определенно не рекомендуется или не поддерживается. Если вы хотите выполнить оба обновления как часть транзакции, вам придется запускать одно обновление за другим. Если вы хотите запускать их одновременно, вам нужно будет выполнить их без сеанса или транзакции.
Как примечание, сеанс клиента можно использовать только с клиентом, из которого он был создан. В приведенном примере сеанс используется с другим сеансом, что приведет к ошибке.
Комментарии:
1. Спасибо за ваш ответ, Патрик! Да, извините за демонстрацию моего кода, в моем проекте клиент является глобальной переменной, поэтому он всегда использует один и тот же экземпляр; спасибо, что все равно упомянули об этом!
2. Я понимаю, почему это было бы ограничением для нескольких операций чтения, но если все действия выполняются в отдельных коллекциях (или являются независимыми атомарными записями в несколько документов с разной полезной нагрузкой), Я не понимаю, почему невозможно сохранить случайную согласованность при одновременном их выполнении. Такого рода транзакции никогда не должны создавать условия гонки / конфликты / странное поведение блокировки, и в случае ошибки вся транзакция откатывается до ее совершения в любом случае.
3. Проводя аналогию с Git (что может быть неверно), при обновлении отдельных файлов / папок конфликты слияния не возникают. Извините за дотошность, но это просто звучит как серьезная возможность повысить скорость.
4. Нет ничего плохого в том, чтобы быть дотошным, но я думаю, что вы лаете не на то дерево. В ответе приведены официальные рекомендации для их API, обеспечивающие такое поведение, и указано, что то, что вы хотите, невозможно. Я могу предположить несколько причин, по которым параллельные независимые записи должны быть теоретически возможны, но они все равно предпочитают это запрещать. Независимо от этого, изложение ваших аргументов в комментариях stackoverflow вряд ли что-то изменит.