From 7e64eb97b9c077f92e7f21327c63edc5c82dbc6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joaqu=C3=ADn=20Rosales?= Date: Wed, 4 Dec 2024 18:30:23 -0600 Subject: [PATCH] wip(cat-gateway): add purge command when only one chain follower is left --- catalyst-gateway/bin/src/cardano/mod.rs | 23 ++++++---- .../bin/src/db/index/block/roll_forward.rs | 42 +++++++++++++++++++ 2 files changed, 58 insertions(+), 7 deletions(-) diff --git a/catalyst-gateway/bin/src/cardano/mod.rs b/catalyst-gateway/bin/src/cardano/mod.rs index 41e775865f4..0f8e5e6384a 100644 --- a/catalyst-gateway/bin/src/cardano/mod.rs +++ b/catalyst-gateway/bin/src/cardano/mod.rs @@ -12,7 +12,7 @@ use tracing::{debug, error, info, warn}; use crate::{ db::index::{ - block::index_block, + block::{index_block, roll_forward}, queries::sync_status::{ get::{get_sync_status, SyncStatus}, update::update_sync_status, @@ -451,12 +451,21 @@ impl SyncTask { // TODO: IF there is only 1 chain follower left in sync_tasks, then all // immutable followers have finished. - // When this happens we need to purge the live index of any records that exist - // before the current immutable tip. - // Note: to prevent a data race when multiple nodes are syncing, we probably - // want to put a gap in this, so that there are X slots of overlap - // between the live chain and immutable chain. This gap should be - // a parameter. + // When this happens we need to purge the live index of any records that + // exist before the current immutable tip. + // Note: to prevent a data race when multiple nodes are syncing, we + // probably want to put a gap in this, so that there are X + // slots of overlap between the live chain and immutable + // chain. This gap should be a parameter. + + // WIP: How do we check that only one follower is left, and that the slot buffer + // (overlap) criteria is met? + let only_one_chain_follower_left = false; + if only_one_chain_follower_left { + if let Err(error) = roll_forward::purge_live_index(self.immutable_tip_slot).await { + error!(chain=%self.cfg.chain, error=%error, "BUG: Purging volatile data task failed."); + } + } } error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped. This is an unexpected error!"); diff --git a/catalyst-gateway/bin/src/db/index/block/roll_forward.rs b/catalyst-gateway/bin/src/db/index/block/roll_forward.rs index f0d23fd92c1..7e0c19d4733 100644 --- a/catalyst-gateway/bin/src/db/index/block/roll_forward.rs +++ b/catalyst-gateway/bin/src/db/index/block/roll_forward.rs @@ -1 +1,43 @@ //! Immutable Roll Forward logic. + +use crate::{ + db::index::{block::CassandraSession, queries::purge}, + settings::Settings, +}; + +/// Purge Data from Live Index +#[allow(unused_variables)] +pub(crate) async fn purge_live_index(purge_slot: u64) -> anyhow::Result<()> { + let persistent = false; // get volatile session + let Some(session) = CassandraSession::get(persistent) else { + anyhow::bail!("Failed to acquire db session"); + }; + + let chain_root_for_role0_keys = + purge::chain_root_for_role0_key::PrimaryKeyQuery::execute(&session).await?; + let chain_root_for_stake_address_keys = + purge::chain_root_for_stake_address::PrimaryKeyQuery::execute(&session).await?; + let chain_root_for_txn_id_keys = + purge::chain_root_for_txn_id::PrimaryKeyQuery::execute(&session).await?; + let cip36_registration_keys = + purge::cip36_registration::PrimaryKeyQuery::execute(&session).await?; + let cip36_registration_for_vote_keys = + purge::cip36_registration_for_vote_key::PrimaryKeyQuery::execute(&session).await?; + let cip36_registration_invalid_keys = + purge::cip36_registration_invalid::PrimaryKeyQuery::execute(&session).await?; + let rbac509_registration_keys = + purge::rbac509_registration::PrimaryKeyQuery::execute(&session).await?; + let stake_registration_keys = + purge::stake_registration::PrimaryKeyQuery::execute(&session).await?; + let txi_by_hash_keys = purge::txi_by_hash::PrimaryKeyQuery::execute(&session).await?; + let txo_ada_keys = purge::txo_ada::PrimaryKeyQuery::execute(&session).await?; + let txo_assets_keys = purge::txo_assets::PrimaryKeyQuery::execute(&session).await?; + let unstaked_txo_ada_keys = purge::unstaked_txo_ada::PrimaryKeyQuery::execute(&session).await?; + let unstaked_txo_assets_keys = + purge::unstaked_txo_assets::PrimaryKeyQuery::execute(&session).await?; + + // WIP: delete filtered keys + // let purge_to_slot: u64 = purge_slot.saturating_sub(Settings::purge_slot_buffer()); + + Ok(()) +}