fn is_retryable(e: &Error) -> bool {
matches!(e, Error::Busy(_) | Error::BusySnapshot(_))
|| matches!(e, Error::Error(msg) if msg.contains("conflict"))
}
#[tokio::main]
async fn main() -> Result<(), Error> {
let tmp = NamedTempFile::new().expect("failed to create temp file");
let db = Builder::new_local(tmp.path().to_str().unwrap())
.build()
.await?;
let conn = db.connect()?;
conn.pragma_update("journal_mode", "'mvcc'").await?;
conn.execute("CREATE TABLE hits (val INTEGER)", ()).await?;
let mut handles = Vec::new();
for _ in 0..16 {
let db = db.clone();
handles.push(tokio::spawn(async move {
let val = rand::rng().random_range(1..=100);
let conn = db.connect()?;
loop {
conn.execute("BEGIN CONCURRENT", ()).await?;
let result = conn
.execute(&format!("INSERT INTO hits VALUES ({val})"), ())
.await
.and(conn.execute("COMMIT", ()).await);
match result {
Ok(_) => return Ok::<_, Error>(val),
Err(ref e) if is_retryable(e) => {
let _ = conn.execute("ROLLBACK", ()).await;
tokio::task::yield_now().await;
}
Err(e) => {
let _ = conn.execute("ROLLBACK", ()).await;
return Err(e);
}
}
}
}));
}
for handle in handles {
handle.await.expect("task panicked")?;
}
Ok(())
}