-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(cat-gateway): Purge volatile data after immutable roll forward #1188
Draft
saibatizoku
wants to merge
45
commits into
main
Choose a base branch
from
feat/purge-volatile-data-after-roll-forward
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+2,629
−55
Draft
Changes from 41 commits
Commits
Show all changes
45 commits
Select commit
Hold shift + click to select a range
0e99e9d
wip(cat-gateway): add select and delete queries for purging volatile …
saibatizoku 14b48db
wip(cat-gateway): refactor purge queries
saibatizoku cd5afba
feat(cat-gateway): refactor session execution code for reuse
saibatizoku 0510cb5
feat(cat-gateway): add purge_queries to CassandraSession
saibatizoku 4abc976
fix(cat-gateway): refactor into queries::purge module
saibatizoku e2bf64e
wip(cat-gateway): add purge methods to cassandra session
saibatizoku 3969723
wip(cat-gateway): rust code for txo by stake queries
saibatizoku 1ba35b1
wip(cat-gateway): refactor queries for purging volatile data
saibatizoku a73a1f6
wip(cat-gateway): refactor queries for purging volatile data
saibatizoku 535aabf
fix(cat-gateway): modify types for query result
saibatizoku 7aa90f1
fix(cat-gateway): cleanup naming types
saibatizoku bb2a4b0
chore(cat-gateway): fix spelling
saibatizoku 617a9c7
chore(cat-gateway): fix spelling
saibatizoku a36778b
wip(cat-gateway): rust code for txo assets by stake queries
saibatizoku 79228ae
fix(cat-gateway): cleanup field names
saibatizoku bb9fb0a
fix(cat-gateway): incorporate unstaked txo queries, cleanup cql comments
saibatizoku 7d2ee85
fix(cat-gateway): incorporate unstaked txi queries, cleanup cql comments
saibatizoku 80810fa
fix(cat-gateway): incorporate stake registration queries, cleanup cql…
saibatizoku 3c03af2
fix(cat-gateway): incorporate cip36 registration queries
saibatizoku bab86cf
fix(cat-gateway): incorporate cip36 registration for votekey queries
saibatizoku df85e7b
chore(cat-gateway): fix spelling
saibatizoku 030d298
fix(cat-gateway): incorporate cip36 registration for votekey queries
saibatizoku 2337115
fix(cat-gateway): incorporate cip36 invalid registration queries
saibatizoku 843d9c6
fix(cat-gateway): incorporate rbac509 registration queries
saibatizoku 43f2e8d
fix(cat-gateway): incorporate chain root for role0 registration queries
saibatizoku 6bba6f6
chore(cat-gateway): fix CQL comments
saibatizoku 82ed9b4
fix(cat-gateway): incorporate chain root for stake addr and txn id re…
saibatizoku c08a4f1
fix(cat-gateway): box large futures lint
saibatizoku 40ee8e1
feat(cat-gateway): add PURGE_SLOT_BUFFER parameter
saibatizoku 53876d2
fix(cat-gateway): update scylla type usage
saibatizoku e0e3791
fix(cat-gateway): fix prepared query variants
saibatizoku 9556074
wip(cat-gateway): add purge command when only one chain follower is left
saibatizoku 7b170e0
feat(cat-gateway): purge roll forward volatile data
saibatizoku 3d7b1f8
Merge remote-tracking branch 'origin/main' into feat/purge-volatile-d…
saibatizoku 7059f25
Merge remote-tracking branch 'origin/main' into feat/purge-volatile-d…
saibatizoku c12a216
Merge remote-tracking branch 'origin/main' into feat/purge-volatile-d…
saibatizoku 34af5ef
fix(cat-gateway): correct names for chain root queries
saibatizoku 572aa9b
feat(cat-gateway): wrap up queries to purge volatile data
saibatizoku c405ae6
Merge remote-tracking branch 'origin/main' into feat/purge-volatile-d…
saibatizoku 7fd5b04
Merge branch 'main' into feat/purge-volatile-data-after-roll-forward
stevenj f78a1a8
Merge branch 'main' into feat/purge-volatile-data-after-roll-forward
stevenj 8d98546
fix(cat-gateway): cleanup
saibatizoku 3e0043b
fix(cat-gateway): update cardano-chain-follower to v0.0.6
saibatizoku 762f6a3
Merge remote-tracking branch 'origin/main' into feat/purge-volatile-d…
saibatizoku e791fc3
Merge branch 'main' into feat/purge-volatile-data-after-roll-forward
saibatizoku File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
317 changes: 317 additions & 0 deletions
317
catalyst-gateway/bin/src/db/index/block/roll_forward.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,317 @@ | ||||
//! Immutable Roll Forward logic. | ||||
|
||||
use std::{collections::HashSet, sync::Arc}; | ||||
|
||||
use futures::StreamExt; | ||||
|
||||
use crate::{ | ||||
db::index::{block::CassandraSession, queries::purge}, | ||||
settings::Settings, | ||||
}; | ||||
|
||||
/// Purge Data from Live Index | ||||
#[allow(unused_variables)] | ||||
pub(crate) async fn purge_live_index(purge_slot: u64) -> anyhow::Result<()> { | ||||
let persistent = false; // get volatile session | ||||
let Some(session) = CassandraSession::get(persistent) else { | ||||
anyhow::bail!("Failed to acquire db session"); | ||||
}; | ||||
|
||||
// Purge data up to this slot | ||||
let purge_to_slot: num_bigint::BigInt = purge_slot | ||||
.saturating_sub(Settings::purge_slot_buffer()) | ||||
.into(); | ||||
|
||||
let txn_hashes = purge_txi_by_hash(&session, &purge_to_slot).await?; | ||||
purge_chain_root_for_role0_key(&session, &purge_to_slot).await?; | ||||
purge_chain_root_for_stake_address(&session, &purge_to_slot).await?; | ||||
purge_chain_root_for_txn_id(&session, &txn_hashes).await?; | ||||
purge_cip36_registration(&session, &purge_to_slot).await?; | ||||
purge_cip36_registration_for_vote_key(&session, &purge_to_slot).await?; | ||||
purge_cip36_registration_invalid(&session, &purge_to_slot).await?; | ||||
purge_rbac509_registration(&session, &purge_to_slot).await?; | ||||
purge_stake_registration(&session, &purge_to_slot).await?; | ||||
purge_txo_ada(&session, &purge_to_slot).await?; | ||||
purge_txo_assets(&session, &purge_to_slot).await?; | ||||
purge_unstaked_txo_ada(&session, &purge_to_slot).await?; | ||||
purge_unstaked_txo_assets(&session, &purge_to_slot).await?; | ||||
|
||||
Ok(()) | ||||
} | ||||
|
||||
/// Purge data from `chain_root_for_role0_key`. | ||||
async fn purge_chain_root_for_role0_key( | ||||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||||
) -> anyhow::Result<()> { | ||||
use purge::chain_root_for_role0_key::{DeleteQuery, Params, PrimaryKeyQuery}; | ||||
|
||||
// Get all keys | ||||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||||
// Filter | ||||
let mut delete_params: Vec<Params> = Vec::new(); | ||||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||||
let params: Params = primary_key.into(); | ||||
if ¶ms.slot_no <= purge_to_slot { | ||||
delete_params.push(params); | ||||
} | ||||
} | ||||
// Delete filtered keys | ||||
DeleteQuery::execute(session, delete_params).await?; | ||||
Ok(()) | ||||
} | ||||
|
||||
/// Purge data from `chain_root_for_stake_address`. | ||||
async fn purge_chain_root_for_stake_address( | ||||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||||
) -> anyhow::Result<()> { | ||||
use purge::chain_root_for_stake_address::{DeleteQuery, Params, PrimaryKeyQuery}; | ||||
|
||||
// Get all keys | ||||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||||
// Filter | ||||
let mut delete_params: Vec<Params> = Vec::new(); | ||||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||||
let params: Params = primary_key.into(); | ||||
if ¶ms.slot_no <= purge_to_slot { | ||||
delete_params.push(params); | ||||
} | ||||
} | ||||
// Delete filtered keys | ||||
DeleteQuery::execute(session, delete_params).await?; | ||||
Ok(()) | ||||
} | ||||
|
||||
/// Purge data from `chain_root_for_txn_id`. | ||||
async fn purge_chain_root_for_txn_id( | ||||
session: &Arc<CassandraSession>, txn_hashes: &HashSet<Vec<u8>>, | ||||
) -> anyhow::Result<()> { | ||||
use purge::chain_root_for_txn_id::{DeleteQuery, Params, PrimaryKeyQuery}; | ||||
|
||||
// Get all keys | ||||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||||
// Filter | ||||
let mut delete_params: Vec<Params> = Vec::new(); | ||||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||||
let params: Params = primary_key.into(); | ||||
if txn_hashes.contains(¶ms.transaction_id) { | ||||
delete_params.push(params); | ||||
} | ||||
} | ||||
// Delete filtered keys | ||||
DeleteQuery::execute(session, delete_params).await?; | ||||
Ok(()) | ||||
} | ||||
|
||||
/// Purge data from `cip36_registration`. | ||||
async fn purge_cip36_registration( | ||||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||||
) -> anyhow::Result<()> { | ||||
use purge::cip36_registration::{DeleteQuery, Params, PrimaryKeyQuery}; | ||||
|
||||
// Get all keys | ||||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||||
// Filter | ||||
let mut delete_params: Vec<Params> = Vec::new(); | ||||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||||
let params: Params = primary_key.into(); | ||||
if ¶ms.slot_no <= purge_to_slot { | ||||
delete_params.push(params); | ||||
} | ||||
} | ||||
// Delete filtered keys | ||||
DeleteQuery::execute(session, delete_params).await?; | ||||
Ok(()) | ||||
} | ||||
|
||||
/// Purge data from `cip36_registration_for_vote_key`. | ||||
async fn purge_cip36_registration_for_vote_key( | ||||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||||
) -> anyhow::Result<()> { | ||||
use purge::cip36_registration_for_vote_key::{DeleteQuery, Params, PrimaryKeyQuery}; | ||||
|
||||
// Get all keys | ||||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||||
// Filter | ||||
let mut delete_params: Vec<Params> = Vec::new(); | ||||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||||
let params: Params = primary_key.into(); | ||||
if ¶ms.slot_no <= purge_to_slot { | ||||
delete_params.push(params); | ||||
} | ||||
} | ||||
// Delete filtered keys | ||||
DeleteQuery::execute(session, delete_params).await?; | ||||
Ok(()) | ||||
} | ||||
|
||||
/// Purge data from `cip36_registration_invalid`. | ||||
async fn purge_cip36_registration_invalid( | ||||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||||
) -> anyhow::Result<()> { | ||||
use purge::cip36_registration_invalid::{DeleteQuery, Params, PrimaryKeyQuery}; | ||||
|
||||
// Get all keys | ||||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||||
// Filter | ||||
let mut delete_params: Vec<Params> = Vec::new(); | ||||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||||
let params: Params = primary_key.into(); | ||||
if ¶ms.slot_no <= purge_to_slot { | ||||
delete_params.push(params); | ||||
} | ||||
} | ||||
// Delete filtered keys | ||||
DeleteQuery::execute(session, delete_params).await?; | ||||
Ok(()) | ||||
} | ||||
|
||||
/// Purge data from `rbac509_registration`. | ||||
async fn purge_rbac509_registration( | ||||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||||
) -> anyhow::Result<()> { | ||||
use purge::rbac509_registration::{DeleteQuery, Params, PrimaryKeyQuery}; | ||||
|
||||
// Get all keys | ||||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||||
// Filter | ||||
let mut delete_params: Vec<Params> = Vec::new(); | ||||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||||
let params: Params = primary_key.into(); | ||||
if ¶ms.slot_no <= purge_to_slot { | ||||
delete_params.push(params); | ||||
} | ||||
} | ||||
// Delete filtered keys | ||||
DeleteQuery::execute(session, delete_params).await?; | ||||
Ok(()) | ||||
} | ||||
|
||||
/// Purge data from `stake_registration`. | ||||
async fn purge_stake_registration( | ||||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||||
) -> anyhow::Result<()> { | ||||
use purge::stake_registration::{DeleteQuery, Params, PrimaryKeyQuery}; | ||||
|
||||
// Get all keys | ||||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||||
// Filter | ||||
let mut delete_params: Vec<Params> = Vec::new(); | ||||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||||
let params: Params = primary_key.into(); | ||||
if ¶ms.slot_no <= purge_to_slot { | ||||
delete_params.push(params); | ||||
} | ||||
} | ||||
// Delete filtered keys | ||||
DeleteQuery::execute(session, delete_params).await?; | ||||
Ok(()) | ||||
} | ||||
|
||||
/// Purge data from `txi_by_hash`. | ||||
#[allow(clippy::unused_async)] | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||
async fn purge_txi_by_hash( | ||||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||||
) -> anyhow::Result<HashSet<Vec<u8>>> { | ||||
use purge::txi_by_hash::{DeleteQuery, Params, PrimaryKeyQuery}; | ||||
|
||||
// Get all keys | ||||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||||
// Filter | ||||
let mut delete_params: Vec<Params> = Vec::new(); | ||||
let mut txn_hashes: HashSet<Vec<u8>> = HashSet::new(); | ||||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||||
if &primary_key.2 <= purge_to_slot { | ||||
let params: Params = primary_key.into(); | ||||
txn_hashes.insert(params.txn_hash.clone()); | ||||
delete_params.push(params); | ||||
} | ||||
} | ||||
// Delete filtered keys | ||||
DeleteQuery::execute(session, delete_params).await?; | ||||
Ok(txn_hashes) | ||||
} | ||||
|
||||
/// Purge data from `txo_ada`. | ||||
async fn purge_txo_ada( | ||||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||||
) -> anyhow::Result<()> { | ||||
use purge::txo_ada::{DeleteQuery, Params, PrimaryKeyQuery}; | ||||
|
||||
// Get all keys | ||||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||||
// Filter | ||||
let mut delete_params: Vec<Params> = Vec::new(); | ||||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||||
let params: Params = primary_key.into(); | ||||
if ¶ms.slot_no <= purge_to_slot { | ||||
delete_params.push(params); | ||||
} | ||||
} | ||||
// Delete filtered keys | ||||
DeleteQuery::execute(session, delete_params).await?; | ||||
Ok(()) | ||||
} | ||||
|
||||
/// Purge data from `txo_assets`. | ||||
async fn purge_txo_assets( | ||||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||||
) -> anyhow::Result<()> { | ||||
use purge::txo_assets::{DeleteQuery, Params, PrimaryKeyQuery}; | ||||
|
||||
// Get all keys | ||||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||||
// Filter | ||||
let mut delete_params: Vec<Params> = Vec::new(); | ||||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||||
let params: Params = primary_key.into(); | ||||
if ¶ms.slot_no <= purge_to_slot { | ||||
delete_params.push(params); | ||||
} | ||||
} | ||||
// Delete filtered keys | ||||
DeleteQuery::execute(session, delete_params).await?; | ||||
Ok(()) | ||||
} | ||||
|
||||
/// Purge data from `unstaked_txo_ada`. | ||||
#[allow(clippy::unused_async)] | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||
async fn purge_unstaked_txo_ada( | ||||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||||
) -> anyhow::Result<()> { | ||||
use purge::unstaked_txo_ada::{DeleteQuery, Params, PrimaryKeyQuery}; | ||||
|
||||
// Get all keys | ||||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||||
// Filter | ||||
let mut delete_params: Vec<Params> = Vec::new(); | ||||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||||
if &primary_key.2 <= purge_to_slot { | ||||
let params: Params = primary_key.into(); | ||||
delete_params.push(params); | ||||
} | ||||
} | ||||
// Delete filtered keys | ||||
DeleteQuery::execute(session, delete_params).await?; | ||||
Ok(()) | ||||
} | ||||
|
||||
/// Purge data from `unstaked_txo_assets`. | ||||
async fn purge_unstaked_txo_assets( | ||||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||||
) -> anyhow::Result<()> { | ||||
use purge::unstaked_txo_assets::{DeleteQuery, Params, PrimaryKeyQuery}; | ||||
|
||||
// Get all keys | ||||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||||
// Filter | ||||
let mut delete_params: Vec<Params> = Vec::new(); | ||||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||||
let params: Params = primary_key.into(); | ||||
if ¶ms.slot_no <= purge_to_slot { | ||||
delete_params.push(params); | ||||
} | ||||
} | ||||
// Delete filtered keys | ||||
DeleteQuery::execute(session, delete_params).await?; | ||||
Ok(()) | ||||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems its already redundant