diff --git a/catalyst-gateway/bin/src/db/index/block/roll_forward.rs b/catalyst-gateway/bin/src/db/index/block/roll_forward.rs index 7e0c19d4733..513efa36218 100644 --- a/catalyst-gateway/bin/src/db/index/block/roll_forward.rs +++ b/catalyst-gateway/bin/src/db/index/block/roll_forward.rs @@ -1,5 +1,9 @@ //! Immutable Roll Forward logic. +use std::sync::Arc; + +use futures::StreamExt; + use crate::{ db::index::{block::CassandraSession, queries::purge}, settings::Settings, @@ -13,31 +17,274 @@ pub(crate) async fn purge_live_index(purge_slot: u64) -> anyhow::Result<()> { anyhow::bail!("Failed to acquire db session"); }; - let chain_root_for_role0_keys = - purge::chain_root_for_role0_key::PrimaryKeyQuery::execute(&session).await?; - let chain_root_for_stake_address_keys = - purge::chain_root_for_stake_address::PrimaryKeyQuery::execute(&session).await?; - let chain_root_for_txn_id_keys = - purge::chain_root_for_txn_id::PrimaryKeyQuery::execute(&session).await?; - let cip36_registration_keys = - purge::cip36_registration::PrimaryKeyQuery::execute(&session).await?; - let cip36_registration_for_vote_keys = - purge::cip36_registration_for_vote_key::PrimaryKeyQuery::execute(&session).await?; - let cip36_registration_invalid_keys = - purge::cip36_registration_invalid::PrimaryKeyQuery::execute(&session).await?; - let rbac509_registration_keys = - purge::rbac509_registration::PrimaryKeyQuery::execute(&session).await?; - let stake_registration_keys = - purge::stake_registration::PrimaryKeyQuery::execute(&session).await?; - let txi_by_hash_keys = purge::txi_by_hash::PrimaryKeyQuery::execute(&session).await?; - let txo_ada_keys = purge::txo_ada::PrimaryKeyQuery::execute(&session).await?; - let txo_assets_keys = purge::txo_assets::PrimaryKeyQuery::execute(&session).await?; - let unstaked_txo_ada_keys = purge::unstaked_txo_ada::PrimaryKeyQuery::execute(&session).await?; - let unstaked_txo_assets_keys = - purge::unstaked_txo_assets::PrimaryKeyQuery::execute(&session).await?; - - // WIP: delete filtered keys - // let purge_to_slot: u64 = purge_slot.saturating_sub(Settings::purge_slot_buffer()); + // Purge data up to this slot + let purge_to_slot: num_bigint::BigInt = purge_slot + .saturating_sub(Settings::purge_slot_buffer()) + .into(); + + purge_chain_root_for_role0_key(&session, &purge_to_slot).await?; + purge_chain_root_for_stake_address(&session, &purge_to_slot).await?; + purge_chain_root_for_txn_id(&session, &purge_to_slot).await?; + purge_cip36_registration(&session, &purge_to_slot).await?; + purge_cip36_registration_for_vote_key(&session, &purge_to_slot).await?; + purge_cip36_registration_invalid(&session, &purge_to_slot).await?; + purge_rbac509_registration(&session, &purge_to_slot).await?; + purge_stake_registration(&session, &purge_to_slot).await?; + purge_txi_by_hash(&session, &purge_to_slot).await?; // WIP + purge_txo_ada(&session, &purge_to_slot).await?; + purge_txo_assets(&session, &purge_to_slot).await?; + purge_unstaked_txo_ada(&session, &purge_to_slot).await?; // WIP + purge_unstaked_txo_assets(&session, &purge_to_slot).await?; + + Ok(()) +} + +/// Purge data from `chain_root_for_role0_key`. +async fn purge_chain_root_for_role0_key( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::chain_root_for_role0_key::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `chain_root_for_stake_address`. +async fn purge_chain_root_for_stake_address( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::chain_root_for_stake_address::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `chain_root_for_txn_id`. +async fn purge_chain_root_for_txn_id( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::chain_root_for_txn_id::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `cip36_registration`. +async fn purge_cip36_registration( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::cip36_registration::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `cip36_registration_for_vote_key`. +async fn purge_cip36_registration_for_vote_key( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::cip36_registration_for_vote_key::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `cip36_registration_invalid`. +async fn purge_cip36_registration_invalid( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::cip36_registration_invalid::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `rbac509_registration`. +async fn purge_rbac509_registration( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::rbac509_registration::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `stake_registration`. +async fn purge_stake_registration( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::stake_registration::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `txi_by_hash`. +#[allow(clippy::unused_async)] +async fn purge_txi_by_hash( + _session: &Arc, _purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + // WIP: Get TXN hashes from other queries + Ok(()) +} + +/// Purge data from `txo_ada`. +async fn purge_txo_ada( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::txo_ada::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `txo_assets`. +async fn purge_txo_assets( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::txo_assets::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `unstaked_txo_ada`. +#[allow(clippy::unused_async)] +async fn purge_unstaked_txo_ada( + _session: &Arc, _purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + // use purge::unstaked_txo_ada::{DeleteQuery, Params, PrimaryKeyQuery}; + // WIP: Get TXN hashes from other queries + Ok(()) +} + +/// Purge data from `unstaked_txo_assets`. +async fn purge_unstaked_txo_assets( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::unstaked_txo_assets::{DeleteQuery, Params, PrimaryKeyQuery}; + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; Ok(()) }