Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cat-gateway): Purge volatile data after immutable roll forward #1188

Draft
wants to merge 55 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
0e99e9d
wip(cat-gateway): add select and delete queries for purging volatile …
saibatizoku Nov 5, 2024
14b48db
wip(cat-gateway): refactor purge queries
saibatizoku Nov 12, 2024
cd5afba
feat(cat-gateway): refactor session execution code for reuse
saibatizoku Nov 12, 2024
0510cb5
feat(cat-gateway): add purge_queries to CassandraSession
saibatizoku Nov 12, 2024
4abc976
fix(cat-gateway): refactor into queries::purge module
saibatizoku Nov 12, 2024
e2bf64e
wip(cat-gateway): add purge methods to cassandra session
saibatizoku Nov 12, 2024
3969723
wip(cat-gateway): rust code for txo by stake queries
saibatizoku Nov 12, 2024
1ba35b1
wip(cat-gateway): refactor queries for purging volatile data
saibatizoku Nov 12, 2024
a73a1f6
wip(cat-gateway): refactor queries for purging volatile data
saibatizoku Nov 12, 2024
535aabf
fix(cat-gateway): modify types for query result
saibatizoku Nov 12, 2024
7aa90f1
fix(cat-gateway): cleanup naming types
saibatizoku Nov 12, 2024
bb2a4b0
chore(cat-gateway): fix spelling
saibatizoku Nov 12, 2024
617a9c7
chore(cat-gateway): fix spelling
saibatizoku Nov 12, 2024
a36778b
wip(cat-gateway): rust code for txo assets by stake queries
saibatizoku Nov 12, 2024
79228ae
fix(cat-gateway): cleanup field names
saibatizoku Nov 12, 2024
bb9fb0a
fix(cat-gateway): incorporate unstaked txo queries, cleanup cql comments
saibatizoku Nov 12, 2024
7d2ee85
fix(cat-gateway): incorporate unstaked txi queries, cleanup cql comments
saibatizoku Nov 12, 2024
80810fa
fix(cat-gateway): incorporate stake registration queries, cleanup cql…
saibatizoku Nov 12, 2024
3c03af2
fix(cat-gateway): incorporate cip36 registration queries
saibatizoku Nov 12, 2024
bab86cf
fix(cat-gateway): incorporate cip36 registration for votekey queries
saibatizoku Nov 13, 2024
df85e7b
chore(cat-gateway): fix spelling
saibatizoku Nov 13, 2024
030d298
fix(cat-gateway): incorporate cip36 registration for votekey queries
saibatizoku Nov 13, 2024
2337115
fix(cat-gateway): incorporate cip36 invalid registration queries
saibatizoku Nov 13, 2024
843d9c6
fix(cat-gateway): incorporate rbac509 registration queries
saibatizoku Nov 13, 2024
43f2e8d
fix(cat-gateway): incorporate chain root for role0 registration queries
saibatizoku Nov 14, 2024
6bba6f6
chore(cat-gateway): fix CQL comments
saibatizoku Nov 14, 2024
82ed9b4
fix(cat-gateway): incorporate chain root for stake addr and txn id re…
saibatizoku Nov 14, 2024
c08a4f1
fix(cat-gateway): box large futures lint
saibatizoku Nov 14, 2024
40ee8e1
feat(cat-gateway): add PURGE_SLOT_BUFFER parameter
saibatizoku Dec 3, 2024
53876d2
fix(cat-gateway): update scylla type usage
saibatizoku Dec 4, 2024
e0e3791
fix(cat-gateway): fix prepared query variants
saibatizoku Dec 5, 2024
9556074
wip(cat-gateway): add purge command when only one chain follower is left
saibatizoku Dec 5, 2024
7b170e0
feat(cat-gateway): purge roll forward volatile data
saibatizoku Dec 5, 2024
3d7b1f8
Merge remote-tracking branch 'origin/main' into feat/purge-volatile-d…
saibatizoku Dec 6, 2024
7059f25
Merge remote-tracking branch 'origin/main' into feat/purge-volatile-d…
saibatizoku Dec 7, 2024
c12a216
Merge remote-tracking branch 'origin/main' into feat/purge-volatile-d…
saibatizoku Dec 9, 2024
34af5ef
fix(cat-gateway): correct names for chain root queries
saibatizoku Dec 9, 2024
572aa9b
feat(cat-gateway): wrap up queries to purge volatile data
saibatizoku Dec 10, 2024
c405ae6
Merge remote-tracking branch 'origin/main' into feat/purge-volatile-d…
saibatizoku Dec 10, 2024
7fd5b04
Merge branch 'main' into feat/purge-volatile-data-after-roll-forward
stevenj Dec 11, 2024
f78a1a8
Merge branch 'main' into feat/purge-volatile-data-after-roll-forward
stevenj Dec 17, 2024
8d98546
fix(cat-gateway): cleanup
saibatizoku Dec 19, 2024
3e0043b
fix(cat-gateway): update cardano-chain-follower to v0.0.6
saibatizoku Dec 19, 2024
762f6a3
Merge remote-tracking branch 'origin/main' into feat/purge-volatile-d…
saibatizoku Dec 19, 2024
e791fc3
Merge branch 'main' into feat/purge-volatile-data-after-roll-forward
saibatizoku Dec 23, 2024
2b4a500
Merge branch 'main' into feat/purge-volatile-data-after-roll-forward
saibatizoku Dec 29, 2024
1a3c1f0
Merge branch 'main' into feat/purge-volatile-data-after-roll-forward
saibatizoku Dec 31, 2024
1a3e086
Merge branch 'main' into feat/purge-volatile-data-after-roll-forward
saibatizoku Jan 2, 2025
dd3a0ed
Merge branch 'main' into feat/purge-volatile-data-after-roll-forward
stevenj Jan 3, 2025
c4e5e02
Merge branch 'main' into feat/purge-volatile-data-after-roll-forward
saibatizoku Jan 3, 2025
fda08c5
Merge branch 'main' into feat/purge-volatile-data-after-roll-forward
cong-or Jan 6, 2025
e858084
Merge branch 'main' into feat/purge-volatile-data-after-roll-forward
saibatizoku Jan 7, 2025
8533fb8
Merge branch 'main' into feat/purge-volatile-data-after-roll-forward
cong-or Jan 9, 2025
2d210d9
Merge branch 'main' into feat/purge-volatile-data-after-roll-forward
cong-or Jan 13, 2025
4f2d52a
test(rm schema check): temp
cong-or Jan 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions catalyst-gateway/bin/src/cardano/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tracing::{debug, error, info, warn};

use crate::{
db::index::{
block::index_block,
block::{index_block, roll_forward},
queries::sync_status::{
get::{get_sync_status, SyncStatus},
update::update_sync_status,
Expand Down Expand Up @@ -453,14 +453,11 @@ impl SyncTask {
},
}

// TODO: IF there is only 1 chain follower left in sync_tasks, then all
// immutable followers have finished.
// When this happens we need to purge the live index of any records that exist
// before the current immutable tip.
// Note: to prevent a data race when multiple nodes are syncing, we probably
// want to put a gap in this, so that there are X slots of overlap
// between the live chain and immutable chain. This gap should be
// a parameter.
if self.sync_tasks.len() == 1 {
if let Err(error) = roll_forward::purge_live_index(self.immutable_tip_slot).await {
error!(chain=%self.cfg.chain, error=%error, "BUG: Purging volatile data task failed.");
}
}
}

error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped. This is an unexpected error!");
Expand Down
31 changes: 17 additions & 14 deletions catalyst-gateway/bin/src/db/event/schema_check/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,28 @@ pub(crate) struct MismatchedSchemaError {
}

/// `select_max_version.sql`
const SELECT_MAX_VERSION_SQL: &str = include_str!("select_max_version.sql");
const _SELECT_MAX_VERSION_SQL: &str = include_str!("select_max_version.sql");

impl EventDB {
/// Check the schema version.
/// return the current schema version if its current.
/// Otherwise return an error.
/// Otherwise return an error.]
#[allow(clippy::unused_async)]
pub(crate) async fn schema_version_check() -> anyhow::Result<i32> {
let schema_check = Self::query_one(SELECT_MAX_VERSION_SQL, &[]).await?;
// let schema_check = Self::query_one(SELECT_MAX_VERSION_SQL, &[]).await?;
//
// let current_ver = schema_check.try_get("max")?;
//
// if current_ver == DATABASE_SCHEMA_VERSION {
// Ok(current_ver)
// } else {
// Err(MismatchedSchemaError {
// was: current_ver,
// expected: DATABASE_SCHEMA_VERSION,
// }
// .into())
// }

let current_ver = schema_check.try_get("max")?;

if current_ver == DATABASE_SCHEMA_VERSION {
Ok(current_ver)
} else {
Err(MismatchedSchemaError {
was: current_ver,
expected: DATABASE_SCHEMA_VERSION,
}
.into())
}
Ok(DATABASE_SCHEMA_VERSION)
}
}
1 change: 1 addition & 0 deletions catalyst-gateway/bin/src/db/index/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
pub(crate) mod certs;
pub(crate) mod cip36;
pub(crate) mod rbac509;
pub(crate) mod roll_forward;
pub(crate) mod txi;
pub(crate) mod txo;

Expand Down
314 changes: 314 additions & 0 deletions catalyst-gateway/bin/src/db/index/block/roll_forward.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
//! Immutable Roll Forward logic.

use std::{collections::HashSet, sync::Arc};

use futures::StreamExt;

use crate::{
db::index::{block::CassandraSession, queries::purge},
settings::Settings,
};

/// Purge Data from Live Index
pub(crate) async fn purge_live_index(purge_slot: u64) -> anyhow::Result<()> {
let persistent = false; // get volatile session
cong-or marked this conversation as resolved.
Show resolved Hide resolved
let Some(session) = CassandraSession::get(persistent) else {
anyhow::bail!("Failed to acquire db session");
};

// Purge data up to this slot
let purge_to_slot: num_bigint::BigInt = purge_slot
.saturating_sub(Settings::purge_slot_buffer())
.into();

let txn_hashes = purge_txi_by_hash(&session, &purge_to_slot).await?;
purge_chain_root_for_role0_key(&session, &purge_to_slot).await?;
purge_chain_root_for_stake_address(&session, &purge_to_slot).await?;
purge_chain_root_for_txn_id(&session, &txn_hashes).await?;
purge_cip36_registration(&session, &purge_to_slot).await?;
purge_cip36_registration_for_vote_key(&session, &purge_to_slot).await?;
purge_cip36_registration_invalid(&session, &purge_to_slot).await?;
purge_rbac509_registration(&session, &purge_to_slot).await?;
purge_stake_registration(&session, &purge_to_slot).await?;
purge_txo_ada(&session, &purge_to_slot).await?;
purge_txo_assets(&session, &purge_to_slot).await?;
purge_unstaked_txo_ada(&session, &purge_to_slot).await?;
purge_unstaked_txo_assets(&session, &purge_to_slot).await?;

Ok(())
}

/// Purge data from `chain_root_for_role0_key`.
async fn purge_chain_root_for_role0_key(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::chain_root_for_role0_key::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `chain_root_for_stake_address`.
async fn purge_chain_root_for_stake_address(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::chain_root_for_stake_address::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `chain_root_for_txn_id`.
async fn purge_chain_root_for_txn_id(
session: &Arc<CassandraSession>, txn_hashes: &HashSet<Vec<u8>>,
) -> anyhow::Result<()> {
use purge::chain_root_for_txn_id::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if txn_hashes.contains(&params.transaction_id) {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `cip36_registration`.
async fn purge_cip36_registration(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::cip36_registration::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `cip36_registration_for_vote_key`.
async fn purge_cip36_registration_for_vote_key(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::cip36_registration_for_vote_key::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `cip36_registration_invalid`.
async fn purge_cip36_registration_invalid(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::cip36_registration_invalid::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `rbac509_registration`.
async fn purge_rbac509_registration(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::rbac509_registration::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `stake_registration`.
async fn purge_stake_registration(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::stake_registration::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `txi_by_hash`.
async fn purge_txi_by_hash(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<HashSet<Vec<u8>>> {
use purge::txi_by_hash::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
let mut txn_hashes: HashSet<Vec<u8>> = HashSet::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
if &primary_key.2 <= purge_to_slot {
let params: Params = primary_key.into();
txn_hashes.insert(params.txn_hash.clone());
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(txn_hashes)
}

/// Purge data from `txo_ada`.
async fn purge_txo_ada(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `txo_assets`.
async fn purge_txo_assets(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `unstaked_txo_ada`.
async fn purge_unstaked_txo_ada(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::unstaked_txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
if &primary_key.2 <= purge_to_slot {
let params: Params = primary_key.into();
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `unstaked_txo_assets`.
async fn purge_unstaked_txo_assets(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::unstaked_txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}
Loading
Loading