如何在Rust SQLX中测试两项并行交易?

发布于 2025-01-18 14:18:06 字数 2368 浏览 5 评论 0原文

我正在试验 Rocket、Rust 和 SQLx,我想测试当两个并行事务尝试在我的表中插入重复记录时会发生什么。

我的 insert fn 不包含任何特殊内容,并且工作正常:

async fn insert_credentials<'ex, EX>(&self, executor: EX, credentials: &Credentials) -> Result<u64, Errors>
where
    EX: 'ex + Executor<'ex, Database = Postgres>,
{
    sqlx::query!(
        r#"INSERT INTO credentials (username, password)
        VALUES ($1, crypt($2, gen_salt('bf')))"#,
        credentials.username,
        credentials.password,
    )
    .execute(executor)
    .await
    .map(|result| result.rows_affected())
    .map_err(|err| err.into())
}

但是,我的测试无限期挂起,因为它等待从未发生的提交:

#[async_std::test]
async fn it_should_reject_duplicated_username_in_parallel() {
    let repo = new_repo();
    let db: Pool<Postgres> = connect().await;
    let credentials = new_random_credentials();

    println!("TX1 begins");
    let mut tx1 = db.begin().await.unwrap();
    let rows_affected = repo.insert_credentials(&mut tx1, &credentials).await.unwrap();
    assert_eq!(rows_affected, 1);

    println!("TX2 begins");
    let mut tx2 = db.begin().await.unwrap();
    println!("It hangs on the next line");
    let rows_affected = repo.insert_credentials(&mut tx2, &credentials).await.unwrap();
    assert_eq!(rows_affected, 1);
    
    println!("It never reaches this line");
    tx1.commit().await.unwrap();
    tx2.commit().await.unwrap();
}

如何并行创建和执行这些 TX,以便断言通过,但在尝试时测试失败提交第二个 TX?

作为参考,这是我的 Cargo.toml

[package]
name = "auth"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.52"
serde = "1.0.136"
thiserror = "1.0.30"

# TODO https://github.com/SergioBenitez/Rocket/issues/1893#issuecomment-1002393878
rocket = { git = "https://github.com/SergioBenitez/Rocket", features = ["json"] }

[dependencies.redis]
version = "0.21.5"
features = ["tokio-comp"]

[dependencies.sqlx]
version = "0.5.11"
features = ["macros", "runtime-tokio-rustls", "postgres"]

[dependencies.uuid]
version = "1.0.0-alpha.1"
features = ["v4", "fast-rng", "macro-diagnostics"]

## DEV ##

[dev-dependencies]
mockall = "0.11.0"

[dev-dependencies.async-std]
version = "1.11.0"
features = ["attributes", "tokio1"]

I'm experimenting with Rocket, Rust and SQLx and I'd like to test what happens when two parallel transactions try to insert a duplicated record on my table.

My insert fn contains nothing special and it works fine:

async fn insert_credentials<'ex, EX>(&self, executor: EX, credentials: &Credentials) -> Result<u64, Errors>
where
    EX: 'ex + Executor<'ex, Database = Postgres>,
{
    sqlx::query!(
        r#"INSERT INTO credentials (username, password)
        VALUES ($1, crypt($2, gen_salt('bf')))"#,
        credentials.username,
        credentials.password,
    )
    .execute(executor)
    .await
    .map(|result| result.rows_affected())
    .map_err(|err| err.into())
}

My test, though, hangs indefinitely since it waits for a commit that never happens:

#[async_std::test]
async fn it_should_reject_duplicated_username_in_parallel() {
    let repo = new_repo();
    let db: Pool<Postgres> = connect().await;
    let credentials = new_random_credentials();

    println!("TX1 begins");
    let mut tx1 = db.begin().await.unwrap();
    let rows_affected = repo.insert_credentials(&mut tx1, &credentials).await.unwrap();
    assert_eq!(rows_affected, 1);

    println!("TX2 begins");
    let mut tx2 = db.begin().await.unwrap();
    println!("It hangs on the next line");
    let rows_affected = repo.insert_credentials(&mut tx2, &credentials).await.unwrap();
    assert_eq!(rows_affected, 1);
    
    println!("It never reaches this line");
    tx1.commit().await.unwrap();
    tx2.commit().await.unwrap();
}

How do I create and execute those TXs in parallel, such that the assertions pass but the test fails when trying to commit the second TX?

For reference, this is my Cargo.toml

[package]
name = "auth"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.52"
serde = "1.0.136"
thiserror = "1.0.30"

# TODO https://github.com/SergioBenitez/Rocket/issues/1893#issuecomment-1002393878
rocket = { git = "https://github.com/SergioBenitez/Rocket", features = ["json"] }

[dependencies.redis]
version = "0.21.5"
features = ["tokio-comp"]

[dependencies.sqlx]
version = "0.5.11"
features = ["macros", "runtime-tokio-rustls", "postgres"]

[dependencies.uuid]
version = "1.0.0-alpha.1"
features = ["v4", "fast-rng", "macro-diagnostics"]

## DEV ##

[dev-dependencies]
mockall = "0.11.0"

[dev-dependencies.async-std]
version = "1.11.0"
features = ["attributes", "tokio1"]

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

苍景流年 2025-01-25 14:18:06

您可以使用 ::超时 。示例使用async_std:

use async_std::future;
use std::time::Duration;

let max_duration = Duration::from_millis(100);
assert!(timeout(max_duration, tx2.commit()).await.is_err());

如果要继续进行tx2在完成tx1之前,则可以 async_std :: task :: spawn tokio :: Spawn tx1首先:

async_std::task::spawn(async move {
    assert!(tx1.commit().await.is_ok());
});

You can use a async_std::future::timeout or tokio::time::timeout. Example using async_std:

use async_std::future;
use std::time::Duration;

let max_duration = Duration::from_millis(100);
assert!(timeout(max_duration, tx2.commit()).await.is_err());

If you want to continue to tx2 before completing tx1, you can async_std::task::spawn or tokio::spawn the tx1 first:

async_std::task::spawn(async move {
    assert!(tx1.commit().await.is_ok());
});
却一份温柔 2025-01-25 14:18:06

@mika指出了正确的方向,我可以同时产生交易并添加一些超时,以使并发的TXS有一些时间执行。

    let handle1 = tokio::spawn(async move {
        let repo = new_repo();
        let mut tx = db1.begin().await.unwrap();
        let rows_affected = repo.insert_credentials(&mut tx, &credentials1).await.unwrap();
        assert_eq!(rows_affected, 1);
        tokio::time::sleep(Duration::from_millis(100)).await;
        tx.commit().await.unwrap()
    });
    
    let handle2 = tokio::spawn(async move {
        let repo = new_repo();
        let mut tx = db2.begin().await.unwrap();
        let rows_affected = repo.insert_credentials(&mut tx, &credentials2).await.unwrap();
        assert_eq!(rows_affected, 1);
        tokio::time::sleep(Duration::from_millis(100)).await;
        tx.commit().await.unwrap()
    });

    let (_first, _second) = rocket::tokio::try_join!(handle1, handle2).unwrap();

我以为这两个TXS都可以并行执行直到睡眠线,然后一个人会提交,另一个TX会在提交行上失败。但是不,实际上两个TXS都并行执行,TX1运行直到插入线上的睡眠和TX2阻止直到TX1进行,然后TX2在插入线上失败。

我想这就是DB在这种情况下的工作原理,也许我可以通过弄乱TX隔离来改变这一点,但这不是我在这里的意图。我只是为了学习更多的学习,今天的学习已经足够了:)

@Mika pointed me the right direction, I could spawn both transactions and add a bit of timeout to give the concurrent TXs some time to execute.

    let handle1 = tokio::spawn(async move {
        let repo = new_repo();
        let mut tx = db1.begin().await.unwrap();
        let rows_affected = repo.insert_credentials(&mut tx, &credentials1).await.unwrap();
        assert_eq!(rows_affected, 1);
        tokio::time::sleep(Duration::from_millis(100)).await;
        tx.commit().await.unwrap()
    });
    
    let handle2 = tokio::spawn(async move {
        let repo = new_repo();
        let mut tx = db2.begin().await.unwrap();
        let rows_affected = repo.insert_credentials(&mut tx, &credentials2).await.unwrap();
        assert_eq!(rows_affected, 1);
        tokio::time::sleep(Duration::from_millis(100)).await;
        tx.commit().await.unwrap()
    });

    let (_first, _second) = rocket::tokio::try_join!(handle1, handle2).unwrap();

I thought this way both TXs would execute in parallel until the sleep line, then one would commit and the other one would fail on the commit line. But no, actually both TXs execute in parallel, TX1 runs until the sleep and TX2 blocks on the insert line until TX1 commits, then TX2 fails on the insert line.

I guess that's just how DB works on this case and maybe I could change that by messing with TX isolation, but that's not my intent here. I'm just playing to learn more, and that's enough learning for today :)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文