Skip to content

Commit

Permalink
wip(cat-gateway): rust code for txo by stake queries
Browse files Browse the repository at this point in the history
  • Loading branch information
saibatizoku committed Nov 12, 2024
1 parent fd3e1e9 commit a8bbb04
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 85 deletions.
77 changes: 0 additions & 77 deletions catalyst-gateway/bin/src/db/index/block/roll_forward.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1 @@
//! Immutable Roll Forward logic.
#![allow(dead_code, clippy::unused_async, clippy::todo)]
use std::sync::Arc;

use scylla::Session;

use crate::{db::index::queries::SizedBatch, settings::cassandra_db::EnvVars};

type Wip = Vec<()>;

/// Roll Forward Purge Query.
pub(crate) struct RollforwardPurgeQuery {}

/// Purge batches.
#[allow(clippy::struct_field_names)]
pub(crate) struct PurgeBatches {
/// Get Primary Keys forTXO Purge Query.
pub(crate) get_txo_purge_queries: SizedBatch,
/// Get Primary Keys forTXO Asset Purge Query.
pub(crate) get_txo_asset_purge_queries: SizedBatch,
/// Get Primary Keys forUnstaked TXO Purge Query.
pub(crate) get_unstaked_txo_purge_queries: SizedBatch,
/// Get Primary Keys forUnstaked TXO Asset Purge Query.
pub(crate) get_unstaked_txo_asset_purge_queries: SizedBatch,
/// Get Primary Keys forTXI Purge Query.
pub(crate) get_txi_purge_queries: SizedBatch,
/// Get Primary Keys forTXI Purge Query.
pub(crate) get_stake_registration_purge_queries: SizedBatch,
/// Get Primary Keys forCIP36 Registrations Purge Query.
pub(crate) get_cip36_registration_purge_queries: SizedBatch,
/// Get Primary Keys forCIP36 Registration errors Purge Query.
pub(crate) get_cip36_registration_error_purge_queries: SizedBatch,
/// Get Primary Keys forCIP36 Registration for Stake Address Purge Query.
pub(crate) get_cip36_registration_for_stake_address_purge_queries: SizedBatch,
/// Get Primary Keys forRBAC 509 Registrations Purge Query.
pub(crate) get_rbac509_registration_purge_queries: SizedBatch,
/// Get Primary Keys forChain Root for TX ID Purge Query..
pub(crate) get_chain_root_for_txn_id_purge_queries: SizedBatch,
/// Get Primary Keys forChain Root for Role 0 Key Purge Query..
pub(crate) get_chain_root_for_role0_key_purge_queries: SizedBatch,
/// Get Primary Keys forChain Root for Stake Address Purge Query..
pub(crate) get_chain_root_for_stake_address_purge_queries: SizedBatch,
/// TXO Purge Query.
pub(crate) txo_purge_queries: SizedBatch,
/// TXO Asset Purge Query.
pub(crate) txo_asset_purge_queries: SizedBatch,
/// Unstaked TXO Purge Query.
pub(crate) unstaked_txo_purge_queries: SizedBatch,
/// Unstaked TXO Asset Purge Query.
pub(crate) unstaked_txo_asset_purge_queries: SizedBatch,
/// TXI Purge Query.
pub(crate) txi_purge_queries: SizedBatch,
/// TXI Purge Query.
pub(crate) stake_registration_purge_queries: SizedBatch,
/// CIP36 Registrations Purge Query.
pub(crate) cip36_registration_purge_queries: SizedBatch,
/// CIP36 Registration errors Purge Query.
pub(crate) cip36_registration_error_purge_queries: SizedBatch,
/// CIP36 Registration for Stake Address Purge Query.
pub(crate) cip36_registration_for_stake_address_purge_queries: SizedBatch,
/// RBAC 509 Registrations Purge Query.
pub(crate) rbac509_registration_purge_queries: SizedBatch,
/// Chain Root for TX ID Purge Query..
pub(crate) chain_root_for_txn_id_purge_queries: SizedBatch,
/// Chain Root for Role 0 Key Purge Query..
pub(crate) chain_root_for_role0_key_purge_queries: SizedBatch,
/// Chain Root for Stake Address Purge Query..
pub(crate) chain_root_for_stake_address_purge_queries: SizedBatch,
}

impl RollforwardPurgeQuery {
/// Prepare the purge query.
pub(crate) async fn prepare_batch(
_session: &Arc<Session>, _cfg: &EnvVars,
) -> anyhow::Result<PurgeBatches> {
todo!();
}
}
20 changes: 12 additions & 8 deletions catalyst-gateway/bin/src/db/index/queries/purge/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Queries for purging volatile data.
pub(crate) mod txo_by_stake_addr;

use std::{fmt::Debug, sync::Arc};

use scylla::{
Expand All @@ -8,13 +10,12 @@ use scylla::{
};

use super::{
super::block::roll_forward::{PurgeBatches, RollforwardPurgeQuery},
FallibleQueryResults, SizedBatch,
};
use crate::{
db::index::block::roll_forward::txo_by_stake_addr::get::TxoByStakeAddressPrimaryKeyQuery,
settings::cassandra_db,
};
use crate::settings::cassandra_db;

/// No parameters
const NO_PARAMS = ();

/// All prepared DELETE query statements (purge DB table rows).
#[derive(strum_macros::Display)]
Expand Down Expand Up @@ -137,13 +138,14 @@ pub(crate) struct PreparedQueries {

impl PreparedQueries {
/// Create new prepared queries for a given session.
#[allow(clippy::todo)]
#[allow(clippy::todo, unused_variables)]
pub(crate) async fn new(
session: Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<Self> {
// We initialize like this, so that all errors preparing querys get shown before aborting.
let all_purge_queries = RollforwardPurgeQuery::prepare_batch(&session, cfg).await;
let get_txo_purge_queries = TxoByStakeAddressPrimaryKeyQuery::prepare(&session).await?;
let get_txo_purge_queries =
txo_by_stake_addr::select::PrimaryKeyQuery::prepare(&session).await?;
let _unused = "
let PurgeBatches {
get_txo_asset_purge_queries,
get_unstaked_txo_purge_queries,
Expand Down Expand Up @@ -201,6 +203,8 @@ impl PreparedQueries {
chain_root_for_role0_key_purge_queries,
chain_root_for_stake_address_purge_queries,
})
";
todo!("WIP");
}

/// Prepares a statement.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//! Delete queries for TXO by Stake Address.
use std::{fmt::Debug, sync::Arc};

use scylla::{SerializeRow, Session};

use crate::{
db::index::queries::{purge::PreparedQueries, SizedBatch},
settings::cassandra_db,
};

/// Delete TXO by Stake Address
const DELETE_QUERY: &str = include_str!("../cql/delete_txo_by_stake_address.cql");

/// Get TXO by Stake Address Query Parameters
#[derive(SerializeRow, Clone)]
pub(super) struct Params {
/// Stake Address - Binary 28 bytes. 0 bytes = not staked.
stake_address: Vec<u8>,
/// Block Slot Number
slot_no: num_bigint::BigInt,
/// Transaction Offset inside the block.
txn: i16,
/// Transaction Output Offset inside the transaction.
txo: i16,
}

impl Debug for Params {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Params")
.field("stake_address", &self.stake_address)
.field("slot_no", &self.slot_no)
.field("txn", &self.txn)
.field("txo", &self.txo)
.finish()
}
}

impl Params {
/// Create a new record for this transaction.
pub(super) fn new(stake_address: &[u8], slot_no: u64, txn: i16, txo: i16) -> Self {
Self {
stake_address: stake_address.to_vec(),
slot_no: slot_no.into(),
txn,
txo,
}
}

/// Prepare Batch of Get TXO Asset Index Data Queries
pub(super) async fn prepare_batch(
session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<SizedBatch> {
let delete_queries = PreparedQueries::prepare_batch(
session.clone(),
DELETE_QUERY,
cfg,
scylla::statement::Consistency::Any,
true,
false,
)
.await?;
Ok(delete_queries)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
//! TXO by Stake Address Queries used in purging data.
pub(crate) mod delete;
pub(crate) mod select;
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//! Select queries for TXO by Stake Address.
use std::sync::Arc;

use scylla::{
prepared_statement::PreparedStatement, transport::iterator::TypedRowIterator, FromRow,
Session,
};
use tracing::error;

use crate::db::index::{
queries::purge::{PreparedQueries, PreparedSelectQuery},
session::CassandraSession,
};

/// Select primary keys for TXO by Stake Address.
const SELECT_QUERY: &str = include_str!("../cql/get_txo_by_stake_address.cql");

/// Primary Key Row
#[derive(FromRow)]
pub(crate) struct PrimaryKey {
/// Stake Address - Binary 28 bytes. 0 bytes = not staked.
stake_address: Vec<u8>,
/// Block Slot Number
slot_no: num_bigint::BigInt,
/// Transaction Offset inside the block.
txn: i16,
/// Transaction Output Offset inside the transaction.
txo: i16,
}

/// Get primary key for TXO by Stake Address query.
pub(crate) struct PrimaryKeyQuery;

impl PrimaryKeyQuery {
/// Prepares a query to get all TXO by stake address primary keys.
pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
let select_primary_key = PreparedQueries::prepare(
session.clone(),
SELECT_QUERY,
scylla::statement::Consistency::All,
true,
)
.await;

if let Err(ref error) = select_primary_key {
error!(error=%error, "Failed to prepare get TXO by stake address primary key query");
};

select_primary_key
}

/// Executes a query to get all TXO by stake address primary keys.
pub(crate) async fn execute(
session: &CassandraSession,
) -> anyhow::Result<TypedRowIterator<PrimaryKey>> {
let iter = session
.purge_execute_iter(PreparedSelectQuery::TxoAda)
.await?
.into_typed::<PrimaryKey>();

Ok(iter)
}
}

0 comments on commit a8bbb04

Please sign in to comment.