From a8bbb042ef011e0b1db16a33cd83a48231b3dc47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joaqu=C3=ADn=20Rosales?= Date: Mon, 11 Nov 2024 21:41:13 -0600 Subject: [PATCH] wip(cat-gateway): rust code for txo by stake queries --- .../bin/src/db/index/block/roll_forward.rs | 77 ------------------- .../bin/src/db/index/queries/purge/mod.rs | 20 +++-- .../queries/purge/txo_by_stake_addr/delete.rs | 64 +++++++++++++++ .../queries/purge/txo_by_stake_addr/mod.rs | 4 + .../queries/purge/txo_by_stake_addr/select.rs | 64 +++++++++++++++ 5 files changed, 144 insertions(+), 85 deletions(-) create mode 100644 catalyst-gateway/bin/src/db/index/queries/purge/txo_by_stake_addr/delete.rs create mode 100644 catalyst-gateway/bin/src/db/index/queries/purge/txo_by_stake_addr/mod.rs create mode 100644 catalyst-gateway/bin/src/db/index/queries/purge/txo_by_stake_addr/select.rs diff --git a/catalyst-gateway/bin/src/db/index/block/roll_forward.rs b/catalyst-gateway/bin/src/db/index/block/roll_forward.rs index e6333474997..f0d23fd92c1 100644 --- a/catalyst-gateway/bin/src/db/index/block/roll_forward.rs +++ b/catalyst-gateway/bin/src/db/index/block/roll_forward.rs @@ -1,78 +1 @@ //! Immutable Roll Forward logic. -#![allow(dead_code, clippy::unused_async, clippy::todo)] -use std::sync::Arc; - -use scylla::Session; - -use crate::{db::index::queries::SizedBatch, settings::cassandra_db::EnvVars}; - -type Wip = Vec<()>; - -/// Roll Forward Purge Query. -pub(crate) struct RollforwardPurgeQuery {} - -/// Purge batches. -#[allow(clippy::struct_field_names)] -pub(crate) struct PurgeBatches { - /// Get Primary Keys forTXO Purge Query. - pub(crate) get_txo_purge_queries: SizedBatch, - /// Get Primary Keys forTXO Asset Purge Query. - pub(crate) get_txo_asset_purge_queries: SizedBatch, - /// Get Primary Keys forUnstaked TXO Purge Query. - pub(crate) get_unstaked_txo_purge_queries: SizedBatch, - /// Get Primary Keys forUnstaked TXO Asset Purge Query. - pub(crate) get_unstaked_txo_asset_purge_queries: SizedBatch, - /// Get Primary Keys forTXI Purge Query. - pub(crate) get_txi_purge_queries: SizedBatch, - /// Get Primary Keys forTXI Purge Query. - pub(crate) get_stake_registration_purge_queries: SizedBatch, - /// Get Primary Keys forCIP36 Registrations Purge Query. - pub(crate) get_cip36_registration_purge_queries: SizedBatch, - /// Get Primary Keys forCIP36 Registration errors Purge Query. - pub(crate) get_cip36_registration_error_purge_queries: SizedBatch, - /// Get Primary Keys forCIP36 Registration for Stake Address Purge Query. - pub(crate) get_cip36_registration_for_stake_address_purge_queries: SizedBatch, - /// Get Primary Keys forRBAC 509 Registrations Purge Query. - pub(crate) get_rbac509_registration_purge_queries: SizedBatch, - /// Get Primary Keys forChain Root for TX ID Purge Query.. - pub(crate) get_chain_root_for_txn_id_purge_queries: SizedBatch, - /// Get Primary Keys forChain Root for Role 0 Key Purge Query.. - pub(crate) get_chain_root_for_role0_key_purge_queries: SizedBatch, - /// Get Primary Keys forChain Root for Stake Address Purge Query.. - pub(crate) get_chain_root_for_stake_address_purge_queries: SizedBatch, - /// TXO Purge Query. - pub(crate) txo_purge_queries: SizedBatch, - /// TXO Asset Purge Query. - pub(crate) txo_asset_purge_queries: SizedBatch, - /// Unstaked TXO Purge Query. - pub(crate) unstaked_txo_purge_queries: SizedBatch, - /// Unstaked TXO Asset Purge Query. - pub(crate) unstaked_txo_asset_purge_queries: SizedBatch, - /// TXI Purge Query. - pub(crate) txi_purge_queries: SizedBatch, - /// TXI Purge Query. - pub(crate) stake_registration_purge_queries: SizedBatch, - /// CIP36 Registrations Purge Query. - pub(crate) cip36_registration_purge_queries: SizedBatch, - /// CIP36 Registration errors Purge Query. - pub(crate) cip36_registration_error_purge_queries: SizedBatch, - /// CIP36 Registration for Stake Address Purge Query. - pub(crate) cip36_registration_for_stake_address_purge_queries: SizedBatch, - /// RBAC 509 Registrations Purge Query. - pub(crate) rbac509_registration_purge_queries: SizedBatch, - /// Chain Root for TX ID Purge Query.. - pub(crate) chain_root_for_txn_id_purge_queries: SizedBatch, - /// Chain Root for Role 0 Key Purge Query.. - pub(crate) chain_root_for_role0_key_purge_queries: SizedBatch, - /// Chain Root for Stake Address Purge Query.. - pub(crate) chain_root_for_stake_address_purge_queries: SizedBatch, -} - -impl RollforwardPurgeQuery { - /// Prepare the purge query. - pub(crate) async fn prepare_batch( - _session: &Arc, _cfg: &EnvVars, - ) -> anyhow::Result { - todo!(); - } -} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/mod.rs b/catalyst-gateway/bin/src/db/index/queries/purge/mod.rs index 3ddaf30d38b..c311c0ef0a3 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/mod.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/mod.rs @@ -1,5 +1,7 @@ //! Queries for purging volatile data. +pub(crate) mod txo_by_stake_addr; + use std::{fmt::Debug, sync::Arc}; use scylla::{ @@ -8,13 +10,12 @@ use scylla::{ }; use super::{ - super::block::roll_forward::{PurgeBatches, RollforwardPurgeQuery}, FallibleQueryResults, SizedBatch, }; -use crate::{ - db::index::block::roll_forward::txo_by_stake_addr::get::TxoByStakeAddressPrimaryKeyQuery, - settings::cassandra_db, -}; +use crate::settings::cassandra_db; + +/// No parameters +const NO_PARAMS = (); /// All prepared DELETE query statements (purge DB table rows). #[derive(strum_macros::Display)] @@ -137,13 +138,14 @@ pub(crate) struct PreparedQueries { impl PreparedQueries { /// Create new prepared queries for a given session. - #[allow(clippy::todo)] + #[allow(clippy::todo, unused_variables)] pub(crate) async fn new( session: Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { // We initialize like this, so that all errors preparing querys get shown before aborting. - let all_purge_queries = RollforwardPurgeQuery::prepare_batch(&session, cfg).await; - let get_txo_purge_queries = TxoByStakeAddressPrimaryKeyQuery::prepare(&session).await?; + let get_txo_purge_queries = + txo_by_stake_addr::select::PrimaryKeyQuery::prepare(&session).await?; + let _unused = " let PurgeBatches { get_txo_asset_purge_queries, get_unstaked_txo_purge_queries, @@ -201,6 +203,8 @@ impl PreparedQueries { chain_root_for_role0_key_purge_queries, chain_root_for_stake_address_purge_queries, }) + "; + todo!("WIP"); } /// Prepares a statement. diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/txo_by_stake_addr/delete.rs b/catalyst-gateway/bin/src/db/index/queries/purge/txo_by_stake_addr/delete.rs new file mode 100644 index 00000000000..4c1a29ae555 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/txo_by_stake_addr/delete.rs @@ -0,0 +1,64 @@ +//! Delete queries for TXO by Stake Address. +use std::{fmt::Debug, sync::Arc}; + +use scylla::{SerializeRow, Session}; + +use crate::{ + db::index::queries::{purge::PreparedQueries, SizedBatch}, + settings::cassandra_db, +}; + +/// Delete TXO by Stake Address +const DELETE_QUERY: &str = include_str!("../cql/delete_txo_by_stake_address.cql"); + +/// Get TXO by Stake Address Query Parameters +#[derive(SerializeRow, Clone)] +pub(super) struct Params { + /// Stake Address - Binary 28 bytes. 0 bytes = not staked. + stake_address: Vec, + /// Block Slot Number + slot_no: num_bigint::BigInt, + /// Transaction Offset inside the block. + txn: i16, + /// Transaction Output Offset inside the transaction. + txo: i16, +} + +impl Debug for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Params") + .field("stake_address", &self.stake_address) + .field("slot_no", &self.slot_no) + .field("txn", &self.txn) + .field("txo", &self.txo) + .finish() + } +} + +impl Params { + /// Create a new record for this transaction. + pub(super) fn new(stake_address: &[u8], slot_no: u64, txn: i16, txo: i16) -> Self { + Self { + stake_address: stake_address.to_vec(), + slot_no: slot_no.into(), + txn, + txo, + } + } + + /// Prepare Batch of Get TXO Asset Index Data Queries + pub(super) async fn prepare_batch( + session: &Arc, cfg: &cassandra_db::EnvVars, + ) -> anyhow::Result { + let delete_queries = PreparedQueries::prepare_batch( + session.clone(), + DELETE_QUERY, + cfg, + scylla::statement::Consistency::Any, + true, + false, + ) + .await?; + Ok(delete_queries) + } +} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/txo_by_stake_addr/mod.rs b/catalyst-gateway/bin/src/db/index/queries/purge/txo_by_stake_addr/mod.rs new file mode 100644 index 00000000000..edcdcdb5749 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/txo_by_stake_addr/mod.rs @@ -0,0 +1,4 @@ +//! TXO by Stake Address Queries used in purging data. + +pub(crate) mod delete; +pub(crate) mod select; diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/txo_by_stake_addr/select.rs b/catalyst-gateway/bin/src/db/index/queries/purge/txo_by_stake_addr/select.rs new file mode 100644 index 00000000000..c795637f0c2 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/txo_by_stake_addr/select.rs @@ -0,0 +1,64 @@ +//! Select queries for TXO by Stake Address. + +use std::sync::Arc; + +use scylla::{ + prepared_statement::PreparedStatement, transport::iterator::TypedRowIterator, FromRow, + Session, +}; +use tracing::error; + +use crate::db::index::{ + queries::purge::{PreparedQueries, PreparedSelectQuery}, + session::CassandraSession, +}; + +/// Select primary keys for TXO by Stake Address. +const SELECT_QUERY: &str = include_str!("../cql/get_txo_by_stake_address.cql"); + +/// Primary Key Row +#[derive(FromRow)] +pub(crate) struct PrimaryKey { + /// Stake Address - Binary 28 bytes. 0 bytes = not staked. + stake_address: Vec, + /// Block Slot Number + slot_no: num_bigint::BigInt, + /// Transaction Offset inside the block. + txn: i16, + /// Transaction Output Offset inside the transaction. + txo: i16, +} + +/// Get primary key for TXO by Stake Address query. +pub(crate) struct PrimaryKeyQuery; + +impl PrimaryKeyQuery { + /// Prepares a query to get all TXO by stake address primary keys. + pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { + let select_primary_key = PreparedQueries::prepare( + session.clone(), + SELECT_QUERY, + scylla::statement::Consistency::All, + true, + ) + .await; + + if let Err(ref error) = select_primary_key { + error!(error=%error, "Failed to prepare get TXO by stake address primary key query"); + }; + + select_primary_key + } + + /// Executes a query to get all TXO by stake address primary keys. + pub(crate) async fn execute( + session: &CassandraSession, + ) -> anyhow::Result> { + let iter = session + .purge_execute_iter(PreparedSelectQuery::TxoAda) + .await? + .into_typed::(); + + Ok(iter) + } +}