Skip to content

Commit

Permalink
retry-tx fix (#758)
Browse files Browse the repository at this point in the history
  • Loading branch information
ewoolsey authored Jul 2, 2024
1 parent 1594eb2 commit 2e11a4a
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use crate::identity_tree::{
use crate::prover::map::initialize_prover_maps;
use crate::prover::repository::ProverRepository;
use crate::prover::{ProverConfig, ProverType};
use crate::retry_tx;
use crate::server::data::{
InclusionProofResponse, ListBatchSizesResponse, VerifySemaphoreProofQuery,
VerifySemaphoreProofRequest, VerifySemaphoreProofResponse,
};
use crate::server::error::Error as ServerError;
use crate::utils::retry_tx;

pub struct App {
pub database: Arc<Database>,
Expand Down
4 changes: 2 additions & 2 deletions src/database/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tracing::instrument;
use crate::database::query::DatabaseQuery;
use crate::database::{Database, Error};
use crate::identity_tree::{Hash, ProcessedStatus};
use crate::utils::retry_tx;
use crate::retry_tx;

async fn mark_root_as_processed(
tx: &mut Transaction<'_, Postgres>,
Expand Down Expand Up @@ -96,7 +96,7 @@ impl Database {
retry_tx!(self.pool, tx, {
mark_root_as_processed(&mut tx, root).await?;
tx.delete_batches_after_root(root).await?;
Ok(())
Result::<_, Error>::Ok(())
})
.await
}
Expand Down
6 changes: 3 additions & 3 deletions src/identity/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::identity_tree::{Canonical, Hash, Intermediate, TreeVersion, TreeWithN
use crate::prover::identity::Identity;
use crate::prover::repository::ProverRepository;
use crate::prover::Prover;
use crate::retry_tx;
use crate::utils::index_packing::pack_indices;
use crate::utils::retry_tx;

pub type TransactionId = String;

Expand Down Expand Up @@ -549,7 +549,7 @@ impl OnChainIdentityProcessor {
.await?;
}

Ok(())
Result::<_, anyhow::Error>::Ok(())
})
.await
}
Expand Down Expand Up @@ -648,7 +648,7 @@ impl OffChainIdentityProcessor {
.await?;
}

Ok(())
Result::<_, anyhow::Error>::Ok(())
})
.await
}
Expand Down
4 changes: 2 additions & 2 deletions src/task_monitor/tasks/insert_identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::database::query::DatabaseQuery as _;
use crate::database::types::UnprocessedCommitment;
use crate::database::Database;
use crate::identity_tree::{Latest, TreeVersion, TreeVersionReadOps, UnprocessedStatus};
use crate::utils::retry_tx;
use crate::retry_tx;

pub async fn insert_identities(
app: Arc<App>,
Expand Down Expand Up @@ -100,7 +100,7 @@ pub async fn insert_identities_batch(
tx.remove_unprocessed_identity(identity).await?;
}

Ok(())
Result::<_, anyhow::Error>::Ok(())
})
.await
}
23 changes: 16 additions & 7 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub const TX_RETRY_LIMIT: u32 = 10;
/// tx.execute("SELECT * FROM table").await?;
/// Ok(tx.execute("SELECT * FROM other").await?)
/// }).await;
#[macro_export]
macro_rules! retry_tx {
($pool:expr, $tx:ident, $expression:expr) => {
async {
Expand All @@ -38,20 +39,29 @@ macro_rules! retry_tx {
loop {
let mut $tx = $pool.begin().await?;
res = async { $expression }.await;
if res.is_err() {
$tx.rollback().await?;
return res;
let limit = 10;
if let Err(e) = res {
counter += 1;
if counter > limit {
return Err(e.into());
} else {
$tx.rollback().await?;
tracing::warn!(
error = ?e,
"db transaction returned error ({counter}/{limit})"
);
continue;
}
}
match $tx.commit().await {
Err(e) => {
counter += 1;
let limit = crate::utils::TX_RETRY_LIMIT;
if counter > limit {
return Err(e.into());
} else {
tracing::warn!(
"db transaction commit failed ({counter}/{limit}): {:?}",
e
error = ?e,
"db transaction commit failed ({counter}/{limit})"
);
}
}
Expand All @@ -62,7 +72,6 @@ macro_rules! retry_tx {
}
};
}
pub(crate) use retry_tx;

pub fn spawn_monitored_with_backoff<S, F>(
future_spawner: S,
Expand Down
44 changes: 29 additions & 15 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,16 +401,13 @@ pub async fn test_inclusion_status(
assert_eq!(expected_status, result.0.status,);
}

#[instrument(skip_all)]
pub async fn test_delete_identity(
pub async fn api_delete_identity(
uri: &str,
client: &Client<HttpConnector>,
ref_tree: &mut PoseidonTree,
test_leaves: &[Field],
leaf_index: usize,
leaf: &Field,
expect_failure: bool,
) -> (merkle_tree::Proof<PoseidonHash>, Field) {
let body = construct_delete_identity_body(&test_leaves[leaf_index]);
) {
let body = construct_delete_identity_body(leaf);

let req = Request::builder()
.method("POST")
Expand All @@ -433,7 +430,18 @@ pub async fn test_delete_identity(
assert!(response.status().is_success());
assert!(bytes.is_empty());
}
}

#[instrument(skip_all)]
pub async fn test_delete_identity(
uri: &str,
client: &Client<HttpConnector>,
ref_tree: &mut PoseidonTree,
test_leaves: &[Field],
leaf_index: usize,
expect_failure: bool,
) -> (merkle_tree::Proof<PoseidonHash>, Field) {
api_delete_identity(uri, client, &test_leaves[leaf_index], expect_failure).await;
ref_tree.set(leaf_index, Hash::ZERO);
(ref_tree.proof(leaf_index).unwrap(), ref_tree.root())
}
Expand Down Expand Up @@ -556,14 +564,8 @@ pub async fn test_remove_batch_size(
}

#[instrument(skip_all)]
pub async fn test_insert_identity(
uri: &str,
client: &Client<HttpConnector>,
ref_tree: &mut PoseidonTree,
test_leaves: &[Field],
leaf_index: usize,
) -> (merkle_tree::Proof<PoseidonHash>, Field) {
let body = construct_insert_identity_body(&test_leaves[leaf_index]);
pub async fn api_insert_identity(uri: &str, client: &Client<HttpConnector>, leaf: &Field) {
let body = construct_insert_identity_body(leaf);
let req = Request::builder()
.method("POST")
.uri(uri.to_owned() + "/insertIdentity")
Expand All @@ -583,6 +585,18 @@ pub async fn test_insert_identity(
}

assert!(bytes.is_empty());
}

#[instrument(skip_all)]
pub async fn test_insert_identity(
uri: &str,
client: &Client<HttpConnector>,
ref_tree: &mut PoseidonTree,
test_leaves: &[Field],
leaf_index: usize,
) -> (merkle_tree::Proof<PoseidonHash>, Field) {
api_insert_identity(uri, client, &test_leaves[leaf_index]).await;

ref_tree.set(leaf_index, test_leaves[leaf_index]);

(ref_tree.proof(leaf_index).unwrap(), ref_tree.root())
Expand Down
144 changes: 144 additions & 0 deletions tests/serializable_transaction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
mod common;
use common::prelude::*;
use futures::stream::StreamExt;
use signup_sequencer::retry_tx;
use sqlx::postgres::PgPoolOptions;
use sqlx::{Postgres, Transaction};
use tokio::time::{sleep, Duration};

async fn setup(pool: &sqlx::Pool<Postgres>) -> Result<(), sqlx::Error> {
retry_tx!(pool, tx, {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS accounts (
id SERIAL PRIMARY KEY,
balance INT
);
"#,
)
.execute(&mut *tx)
.await?;

sqlx::query("TRUNCATE TABLE accounts RESTART IDENTITY;")
.execute(&mut *tx)
.await?;

sqlx::query("INSERT INTO accounts (balance) VALUES (100), (200);")
.execute(&mut *tx)
.await?;

Result::<_, anyhow::Error>::Ok(())
})
.await
.unwrap();

Ok(())
}

async fn transaction_1(pool: &sqlx::Pool<Postgres>) -> Result<(), sqlx::Error> {
retry_tx!(pool, tx, {
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
.execute(&mut *tx)
.await?;

let balance: (i32,) = sqlx::query_as("SELECT balance FROM accounts WHERE id = 1")
.fetch_one(&mut *tx)
.await?;

println!("Transaction 1: Balance of account 1 is {}", balance.0);

// Simulate some work
sleep(Duration::from_secs(5)).await;

sqlx::query("UPDATE accounts SET balance = balance + 30 WHERE id = 2")
.execute(&mut *tx)
.await?;

Result::<_, anyhow::Error>::Ok(())
})
.await
.unwrap();

Ok(())
}

async fn transaction_2(pool: &sqlx::Pool<Postgres>) -> Result<(), sqlx::Error> {
let mut tx: Transaction<'_, Postgres> = pool.begin().await?;

sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
.execute(&mut *tx)
.await?;

let balance: (i32,) = sqlx::query_as("SELECT balance FROM accounts WHERE id = 2")
.fetch_one(&mut *tx)
.await?;

println!("Transaction 2: Balance of account 2 is {}", balance.0);

sqlx::query("UPDATE accounts SET balance = balance + 50 WHERE id = 1")
.execute(&mut *tx)
.await?;

tx.commit().await?;
Ok(())
}

#[tokio::test]
async fn serializable_transaction() -> Result<(), anyhow::Error> {
init_tracing_subscriber();
info!("Starting serializable_transaction");

let insertion_batch_size: usize = 500;
let deletion_batch_size: usize = 10;

let ref_tree = PoseidonTree::new(DEFAULT_TREE_DEPTH + 1, ruint::Uint::ZERO);
let initial_root: U256 = ref_tree.root().into();

let docker = Cli::default();
let (mock_chain, db_container, _insertion_prover_map, _deletion_prover_map, micro_oz) =
spawn_deps(
initial_root,
&[insertion_batch_size],
&[deletion_batch_size],
DEFAULT_TREE_DEPTH as u8,
&docker,
)
.await?;

let db_socket_addr = db_container.address();
let db_url = format!("postgres://postgres:postgres@{db_socket_addr}/database");

let temp_dir = tempfile::tempdir()?;
info!(
"temp dir created at: {:?}",
temp_dir.path().join("testfile")
);

let config = TestConfigBuilder::new()
.db_url(&db_url)
.oz_api_url(&micro_oz.endpoint())
.oz_address(micro_oz.address())
.identity_manager_address(mock_chain.identity_manager.address())
.primary_network_provider(mock_chain.anvil.endpoint())
.cache_file(temp_dir.path().join("testfile").to_str().unwrap())
.build()?;

let (..) = spawn_app(config.clone())
.await
.expect("Failed to spawn app.");

let pool = PgPoolOptions::new()
.max_connections(100)
.connect(&db_url)
.await?;

setup(&pool).await?;
futures::stream::iter(0..2)
.for_each_concurrent(None, |_| async {
transaction_1(&pool).await.unwrap();
transaction_2(&pool).await.unwrap();
})
.await;

Ok(())
}

0 comments on commit 2e11a4a

Please sign in to comment.