如何在Rust SQLX中测试两项并行交易?
我正在试验 Rocket、Rust 和 SQLx,我想测试当两个并行事务尝试在我的表中插入重复记录时会发生什么。
我的 insert fn 不包含任何特殊内容,并且工作正常:
async fn insert_credentials<'ex, EX>(&self, executor: EX, credentials: &Credentials) -> Result<u64, Errors>
where
EX: 'ex + Executor<'ex, Database = Postgres>,
{
sqlx::query!(
r#"INSERT INTO credentials (username, password)
VALUES ($1, crypt($2, gen_salt('bf')))"#,
credentials.username,
credentials.password,
)
.execute(executor)
.await
.map(|result| result.rows_affected())
.map_err(|err| err.into())
}
但是,我的测试无限期挂起,因为它等待从未发生的提交:
#[async_std::test]
async fn it_should_reject_duplicated_username_in_parallel() {
let repo = new_repo();
let db: Pool<Postgres> = connect().await;
let credentials = new_random_credentials();
println!("TX1 begins");
let mut tx1 = db.begin().await.unwrap();
let rows_affected = repo.insert_credentials(&mut tx1, &credentials).await.unwrap();
assert_eq!(rows_affected, 1);
println!("TX2 begins");
let mut tx2 = db.begin().await.unwrap();
println!("It hangs on the next line");
let rows_affected = repo.insert_credentials(&mut tx2, &credentials).await.unwrap();
assert_eq!(rows_affected, 1);
println!("It never reaches this line");
tx1.commit().await.unwrap();
tx2.commit().await.unwrap();
}
如何并行创建和执行这些 TX,以便断言通过,但在尝试时测试失败提交第二个 TX?
作为参考,这是我的 Cargo.toml
[package]
name = "auth"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1.52"
serde = "1.0.136"
thiserror = "1.0.30"
# TODO https://github.com/SergioBenitez/Rocket/issues/1893#issuecomment-1002393878
rocket = { git = "https://github.com/SergioBenitez/Rocket", features = ["json"] }
[dependencies.redis]
version = "0.21.5"
features = ["tokio-comp"]
[dependencies.sqlx]
version = "0.5.11"
features = ["macros", "runtime-tokio-rustls", "postgres"]
[dependencies.uuid]
version = "1.0.0-alpha.1"
features = ["v4", "fast-rng", "macro-diagnostics"]
## DEV ##
[dev-dependencies]
mockall = "0.11.0"
[dev-dependencies.async-std]
version = "1.11.0"
features = ["attributes", "tokio1"]
I'm experimenting with Rocket, Rust and SQLx and I'd like to test what happens when two parallel transactions try to insert a duplicated record on my table.
My insert fn contains nothing special and it works fine:
async fn insert_credentials<'ex, EX>(&self, executor: EX, credentials: &Credentials) -> Result<u64, Errors>
where
EX: 'ex + Executor<'ex, Database = Postgres>,
{
sqlx::query!(
r#"INSERT INTO credentials (username, password)
VALUES ($1, crypt($2, gen_salt('bf')))"#,
credentials.username,
credentials.password,
)
.execute(executor)
.await
.map(|result| result.rows_affected())
.map_err(|err| err.into())
}
My test, though, hangs indefinitely since it waits for a commit that never happens:
#[async_std::test]
async fn it_should_reject_duplicated_username_in_parallel() {
let repo = new_repo();
let db: Pool<Postgres> = connect().await;
let credentials = new_random_credentials();
println!("TX1 begins");
let mut tx1 = db.begin().await.unwrap();
let rows_affected = repo.insert_credentials(&mut tx1, &credentials).await.unwrap();
assert_eq!(rows_affected, 1);
println!("TX2 begins");
let mut tx2 = db.begin().await.unwrap();
println!("It hangs on the next line");
let rows_affected = repo.insert_credentials(&mut tx2, &credentials).await.unwrap();
assert_eq!(rows_affected, 1);
println!("It never reaches this line");
tx1.commit().await.unwrap();
tx2.commit().await.unwrap();
}
How do I create and execute those TXs in parallel, such that the assertions pass but the test fails when trying to commit the second TX?
For reference, this is my Cargo.toml
[package]
name = "auth"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1.52"
serde = "1.0.136"
thiserror = "1.0.30"
# TODO https://github.com/SergioBenitez/Rocket/issues/1893#issuecomment-1002393878
rocket = { git = "https://github.com/SergioBenitez/Rocket", features = ["json"] }
[dependencies.redis]
version = "0.21.5"
features = ["tokio-comp"]
[dependencies.sqlx]
version = "0.5.11"
features = ["macros", "runtime-tokio-rustls", "postgres"]
[dependencies.uuid]
version = "1.0.0-alpha.1"
features = ["v4", "fast-rng", "macro-diagnostics"]
## DEV ##
[dev-dependencies]
mockall = "0.11.0"
[dev-dependencies.async-std]
version = "1.11.0"
features = ["attributes", "tokio1"]
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
您可以使用 或 ::超时 。示例使用async_std:
如果要继续进行
tx2
在完成tx1
之前,则可以async_std :: task :: spawn
或
tokio :: Spawn
tx1首先:You can use a
async_std::future::timeout
ortokio::time::timeout
. Example using async_std:If you want to continue to
tx2
before completingtx1
, you canasync_std::task::spawn
ortokio::spawn
the tx1 first:@mika指出了正确的方向,我可以同时产生交易并添加一些超时,以使并发的TXS有一些时间执行。
我以为这两个TXS都可以并行执行直到睡眠线,然后一个人会提交,另一个TX会在提交行上失败。但是不,实际上两个TXS都并行执行,TX1运行直到插入线上的睡眠和TX2阻止直到TX1进行,然后TX2在插入线上失败。
我想这就是DB在这种情况下的工作原理,也许我可以通过弄乱TX隔离来改变这一点,但这不是我在这里的意图。我只是为了学习更多的学习,今天的学习已经足够了:)
@Mika pointed me the right direction, I could spawn both transactions and add a bit of timeout to give the concurrent TXs some time to execute.
I thought this way both TXs would execute in parallel until the sleep line, then one would commit and the other one would fail on the commit line. But no, actually both TXs execute in parallel, TX1 runs until the sleep and TX2 blocks on the insert line until TX1 commits, then TX2 fails on the insert line.
I guess that's just how DB works on this case and maybe I could change that by messing with TX isolation, but that's not my intent here. I'm just playing to learn more, and that's enough learning for today :)