diff --git a/catalyst-gateway/bin/Cargo.toml b/catalyst-gateway/bin/Cargo.toml index 32a0293c373..4099bfb7bf0 100644 --- a/catalyst-gateway/bin/Cargo.toml +++ b/catalyst-gateway/bin/Cargo.toml @@ -53,7 +53,7 @@ anyhow = "1.0.92" duration-string = "0.4.0" build-info = "0.0.39" ed25519-dalek = "2.1.1" -scylla = { version = "0.14.0", features = ["cloud", "full-serialization"] } +scylla = { version = "0.15.0", features = ["cloud", "full-serialization"] } strum = { version = "0.26.3", features = ["derive"] } strum_macros = "0.26.4" openssl = { version = "0.10.66", features = ["vendored"] } diff --git a/catalyst-gateway/bin/src/db/index/queries/mod.rs b/catalyst-gateway/bin/src/db/index/queries/mod.rs index afb5e79cafc..b257e855d7d 100644 --- a/catalyst-gateway/bin/src/db/index/queries/mod.rs +++ b/catalyst-gateway/bin/src/db/index/queries/mod.rs @@ -21,7 +21,7 @@ use registrations::{ }; use scylla::{ batch::Batch, prepared_statement::PreparedStatement, serialize::row::SerializeRow, - transport::iterator::RowIterator, QueryResult, Session, + transport::iterator::QueryPager, QueryResult, Session, }; use staked_ada::{ get_assets_by_stake_address::GetAssetsByStakeAddressQuery, @@ -311,7 +311,7 @@ impl PreparedQueries { /// returns. pub(crate) async fn execute_iter

( &self, session: Arc, select_query: PreparedSelectQuery, params: P, - ) -> anyhow::Result + ) -> anyhow::Result where P: SerializeRow { let prepared_stmt = match select_query { PreparedSelectQuery::TxoByStakeAddress => &self.txo_by_stake_address_query, diff --git a/catalyst-gateway/bin/src/db/index/queries/rbac/get_chain_root.rs b/catalyst-gateway/bin/src/db/index/queries/rbac/get_chain_root.rs index d0160164f6a..13c66a38231 100644 --- a/catalyst-gateway/bin/src/db/index/queries/rbac/get_chain_root.rs +++ b/catalyst-gateway/bin/src/db/index/queries/rbac/get_chain_root.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowIterator, SerializeRow, - Session, + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, DeserializeRow, + SerializeRow, Session, }; use tracing::error; @@ -22,23 +22,12 @@ pub(crate) struct GetChainRootQueryParams { pub(crate) stake_address: Vec, } -/// Get chain root by stake address query row result -// TODO: https://github.com/input-output-hk/catalyst-voices/issues/828 -// The macro uses expect to signal an error in deserializing values. -#[allow(clippy::expect_used)] -mod result { - use scylla::FromRow; - - /// Get chain root query result. - #[derive(FromRow)] - pub(crate) struct GetChainRootQuery { - /// Chain root for the queries stake address. - pub(crate) chain_root: Vec, - } -} - /// Get chain root by stake address query. -pub(crate) struct GetChainRootQuery; +#[derive(DeserializeRow)] +pub(crate) struct GetChainRootQuery { + /// Chain root for the queries stake address. + pub(crate) chain_root: Vec, +} impl GetChainRootQuery { /// Prepares a get chain root by stake address query. @@ -61,11 +50,11 @@ impl GetChainRootQuery { /// Executes a get chain root by stake address query. pub(crate) async fn execute( session: &CassandraSession, params: GetChainRootQueryParams, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let iter = session .execute_iter(PreparedSelectQuery::ChainRootByStakeAddress, params) .await? - .into_typed::(); + .rows_stream::()?; Ok(iter) } diff --git a/catalyst-gateway/bin/src/db/index/queries/rbac/get_registrations.rs b/catalyst-gateway/bin/src/db/index/queries/rbac/get_registrations.rs index c3f10087571..e020f1a0eb9 100644 --- a/catalyst-gateway/bin/src/db/index/queries/rbac/get_registrations.rs +++ b/catalyst-gateway/bin/src/db/index/queries/rbac/get_registrations.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowIterator, SerializeRow, - Session, + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, DeserializeRow, + SerializeRow, Session, }; use tracing::error; @@ -23,23 +23,12 @@ pub(crate) struct GetRegistrationsByChainRootQueryParams { pub(crate) chain_root: Vec, } -/// Get registrations by chain root query row result -// TODO: https://github.com/input-output-hk/catalyst-voices/issues/828 -// The macro uses expect to signal an error in deserializing values. -#[allow(clippy::expect_used)] -mod result { - use scylla::FromRow; - - /// Get chain root query result. - #[derive(FromRow)] - pub(crate) struct GetRegistrationsByChainRootQuery { - /// Registration transaction id. - pub(crate) transaction_id: Vec, - } -} - /// Get chain root by stake address query. -pub(crate) struct GetRegistrationsByChainRootQuery; +#[derive(DeserializeRow)] +pub(crate) struct GetRegistrationsByChainRootQuery { + /// Registration transaction id. + pub(crate) transaction_id: Vec, +} impl GetRegistrationsByChainRootQuery { /// Prepares a get registrations by chain root query. @@ -62,11 +51,11 @@ impl GetRegistrationsByChainRootQuery { /// Executes a get registrations by chain root query. pub(crate) async fn execute( session: &CassandraSession, params: GetRegistrationsByChainRootQueryParams, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let iter = session .execute_iter(PreparedSelectQuery::RegistrationsByChainRoot, params) .await? - .into_typed::(); + .rows_stream::()?; Ok(iter) } diff --git a/catalyst-gateway/bin/src/db/index/queries/rbac/get_role0_chain_root.rs b/catalyst-gateway/bin/src/db/index/queries/rbac/get_role0_chain_root.rs index 387c2fcb61d..76881e58669 100644 --- a/catalyst-gateway/bin/src/db/index/queries/rbac/get_role0_chain_root.rs +++ b/catalyst-gateway/bin/src/db/index/queries/rbac/get_role0_chain_root.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowIterator, SerializeRow, - Session, + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, DeserializeRow, + SerializeRow, Session, }; use tracing::error; @@ -22,23 +22,12 @@ pub(crate) struct GetRole0ChainRootQueryParams { pub(crate) role0_key: Vec, } -/// Get chain root by role0 key query row result -// TODO: https://github.com/input-output-hk/catalyst-voices/issues/828 -// The macro uses expect to signal an error in deserializing values. -#[allow(clippy::expect_used)] -mod result { - use scylla::FromRow; - - /// Get role0 key chain root query result. - #[derive(FromRow)] - pub(crate) struct GetRole0ChainRootQuery { - /// Chain root. - pub(crate) chain_root: Vec, - } -} - /// Get chain root by role0 key query. -pub(crate) struct GetRole0ChainRootQuery; +#[derive(DeserializeRow)] +pub(crate) struct GetRole0ChainRootQuery { + /// Chain root. + pub(crate) chain_root: Vec, +} impl GetRole0ChainRootQuery { /// Prepares a get chain root by role0 key query. @@ -61,11 +50,11 @@ impl GetRole0ChainRootQuery { /// Executes a get chain root role0 key query. pub(crate) async fn execute( session: &CassandraSession, params: GetRole0ChainRootQueryParams, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let iter = session .execute_iter(PreparedSelectQuery::ChainRootByRole0Key, params) .await? - .into_typed::(); + .rows_stream::()?; Ok(iter) } diff --git a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_addr.rs b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_addr.rs index ce690d454b6..c0ff611937a 100644 --- a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_addr.rs +++ b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_addr.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowIterator, SerializeRow, - Session, + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, DeserializeRow, + SerializeRow, Session, }; use tracing::error; @@ -32,34 +32,26 @@ impl From<&ed25519_dalek::VerifyingKey> for GetRegistrationParams { } } -/// Get registration given stake addr or vote key -#[allow(clippy::expect_used)] -mod result { - use scylla::FromRow; - - /// Get registration query result. - #[derive(FromRow)] - pub(crate) struct GetRegistrationQuery { - /// Full Stake Address (not hashed, 32 byte ED25519 Public key). - pub stake_address: Vec, - /// Nonce value after normalization. - pub nonce: num_bigint::BigInt, - /// Slot Number the cert is in. - pub slot_no: num_bigint::BigInt, - /// Transaction Index. - pub txn: i16, - /// Voting Public Key - pub vote_key: Vec, - /// Full Payment Address (not hashed, 32 byte ED25519 Public key). - pub payment_address: Vec, - /// Is the stake address a script or not. - pub is_payable: bool, - /// Is the Registration CIP36 format, or CIP15 - pub cip36: bool, - } -} /// Get registration query. -pub(crate) struct GetRegistrationQuery; +#[derive(DeserializeRow)] +pub(crate) struct GetRegistrationQuery { + /// Full Stake Address (not hashed, 32 byte ED25519 Public key). + pub stake_address: Vec, + /// Nonce value after normalization. + pub nonce: num_bigint::BigInt, + /// Slot Number the cert is in. + pub slot_no: num_bigint::BigInt, + /// Transaction Index. + pub txn: i16, + /// Voting Public Key + pub vote_key: Vec, + /// Full Payment Address (not hashed, 32 byte ED25519 Public key). + pub payment_address: Vec, + /// Is the stake address a script or not. + pub is_payable: bool, + /// Is the Registration CIP36 format, or CIP15 + pub cip36: bool, +} impl GetRegistrationQuery { /// Prepares a get registration query. @@ -82,11 +74,11 @@ impl GetRegistrationQuery { /// Executes get registration info for given stake addr query. pub(crate) async fn execute( session: &CassandraSession, params: GetRegistrationParams, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let iter = session .execute_iter(PreparedSelectQuery::RegistrationFromStakeAddr, params) .await? - .into_typed::(); + .rows_stream::()?; Ok(iter) } diff --git a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_hash.rs b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_hash.rs index 24722b54d78..134e199e007 100644 --- a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_hash.rs +++ b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_hash.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowIterator, SerializeRow, - Session, + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, DeserializeRow, + SerializeRow, Session, }; use tracing::error; @@ -31,20 +31,12 @@ impl GetStakeAddrParams { } } -/// Get stake addr from stake hash query string. -#[allow(clippy::expect_used)] -mod result { - use scylla::FromRow; - - /// Get stake addr from stake hash query result. - #[derive(FromRow)] - pub(crate) struct GetStakeAddrQuery { - /// Full Stake Address (not hashed, 32 byte ED25519 Public key). - pub stake_address: Vec, - } +/// Get stake addr from stake hash query. +#[derive(DeserializeRow)] +pub(crate) struct GetStakeAddrQuery { + /// Full Stake Address (not hashed, 32 byte ED25519 Public key). + pub stake_address: Vec, } -/// Get registration query. -pub(crate) struct GetStakeAddrQuery; impl GetStakeAddrQuery { /// Prepares a get get stake addr from stake hash query. @@ -67,11 +59,11 @@ impl GetStakeAddrQuery { /// Executes a get txi by transaction hashes query. pub(crate) async fn execute( session: &CassandraSession, params: GetStakeAddrParams, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let iter = session .execute_iter(PreparedSelectQuery::StakeAddrFromStakeHash, params) .await? - .into_typed::(); + .rows_stream::()?; Ok(iter) } diff --git a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_vote_key.rs b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_vote_key.rs index 1d9d591a73b..42bdcb9fa75 100644 --- a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_vote_key.rs +++ b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_vote_key.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowIterator, SerializeRow, - Session, + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, DeserializeRow, + SerializeRow, Session, }; use tracing::error; @@ -29,20 +29,12 @@ impl GetStakeAddrFromVoteKeyParams { } } -/// Get stake addr given vote key -#[allow(clippy::expect_used)] -mod result { - use scylla::FromRow; - - /// Get stake addr from vote key query result. - #[derive(FromRow)] - pub(crate) struct GetStakeAddrFromVoteKeyQuery { - /// Full Stake Address (not hashed, 32 byte ED25519 Public key). - pub stake_address: Vec, - } -} /// Get stake addr from vote key query. -pub(crate) struct GetStakeAddrFromVoteKeyQuery; +#[derive(DeserializeRow)] +pub(crate) struct GetStakeAddrFromVoteKeyQuery { + /// Full Stake Address (not hashed, 32 byte ED25519 Public key). + pub stake_address: Vec, +} impl GetStakeAddrFromVoteKeyQuery { /// Prepares a get stake addr from vote key query. @@ -65,11 +57,11 @@ impl GetStakeAddrFromVoteKeyQuery { /// Executes a get txi by transaction hashes query. pub(crate) async fn execute( session: &CassandraSession, params: GetStakeAddrFromVoteKeyParams, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let iter = session .execute_iter(PreparedSelectQuery::StakeAddrFromVoteKey, params) .await? - .into_typed::(); + .rows_stream::()?; Ok(iter) } diff --git a/catalyst-gateway/bin/src/db/index/queries/registrations/get_invalid.rs b/catalyst-gateway/bin/src/db/index/queries/registrations/get_invalid.rs index cb09152fbdb..ada25c54937 100644 --- a/catalyst-gateway/bin/src/db/index/queries/registrations/get_invalid.rs +++ b/catalyst-gateway/bin/src/db/index/queries/registrations/get_invalid.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowIterator, SerializeRow, - Session, + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, DeserializeRow, + SerializeRow, Session, }; use tracing::error; @@ -38,30 +38,22 @@ impl GetInvalidRegistrationParams { } } -/// Get invalid registrations given stake addr -#[allow(clippy::expect_used)] -mod result { - use scylla::FromRow; - - /// Get registration query result. - #[derive(FromRow)] - pub(crate) struct GetInvalidRegistrationQuery { - /// Error report - pub error_report: Vec, - /// Full Stake Address (not hashed, 32 byte ED25519 Public key). - pub stake_address: Vec, - /// Voting Public Key - pub vote_key: Vec, - /// Full Payment Address (not hashed, 32 byte ED25519 Public key). - pub payment_address: Vec, - /// Is the stake address a script or not. - pub is_payable: bool, - /// Is the Registration CIP36 format, or CIP15 - pub cip36: bool, - } +/// Get invalid registrations given stake address. +#[derive(DeserializeRow)] +pub(crate) struct GetInvalidRegistrationQuery { + /// Error report + pub error_report: Vec, + /// Full Stake Address (not hashed, 32 byte ED25519 Public key). + pub stake_address: Vec, + /// Voting Public Key + pub vote_key: Vec, + /// Full Payment Address (not hashed, 32 byte ED25519 Public key). + pub payment_address: Vec, + /// Is the stake address a script or not. + pub is_payable: bool, + /// Is the Registration CIP36 format, or CIP15 + pub cip36: bool, } -/// Get invalid registration query. -pub(crate) struct GetInvalidRegistrationQuery; impl GetInvalidRegistrationQuery { /// Prepares a get invalid registration query. @@ -84,14 +76,14 @@ impl GetInvalidRegistrationQuery { /// Executes get invalid registration info for given stake addr query. pub(crate) async fn execute( session: &CassandraSession, params: GetInvalidRegistrationParams, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let iter = session .execute_iter( PreparedSelectQuery::InvalidRegistrationsFromStakeAddr, params, ) .await? - .into_typed::(); + .rows_stream::()?; Ok(iter) } diff --git a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_assets_by_stake_address.rs b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_assets_by_stake_address.rs index 09bc8f2df03..343a7a36c82 100644 --- a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_assets_by_stake_address.rs +++ b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_assets_by_stake_address.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowIterator, SerializeRow, - Session, + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, DeserializeRow, + SerializeRow, Session, }; use tracing::error; @@ -35,34 +35,23 @@ impl GetAssetsByStakeAddressParams { } } -/// Get assets by stake address query row result -// TODO: https://github.com/input-output-hk/catalyst-voices/issues/828 -// The macro uses expect to signal an error in deserializing values. -#[allow(clippy::expect_used)] -mod result { - use scylla::FromRow; - - /// Get native assets query result. - #[derive(FromRow)] - pub(crate) struct GetAssetsByStakeAddressQuery { - /// TXO transaction index within the slot. - pub txn: i16, - /// TXO index. - pub txo: i16, - /// TXO transaction slot number. - pub slot_no: num_bigint::BigInt, - /// Asset hash. - pub policy_id: Vec, - /// Asset name. - pub asset_name: Vec, - /// Asset value. - pub value: num_bigint::BigInt, - } +/// Get native assets query. +#[derive(DeserializeRow)] +pub(crate) struct GetAssetsByStakeAddressQuery { + /// TXO transaction index within the slot. + pub txn: i16, + /// TXO index. + pub txo: i16, + /// TXO transaction slot number. + pub slot_no: num_bigint::BigInt, + /// Asset hash. + pub policy_id: Vec, + /// Asset name. + pub asset_name: Vec, + /// Asset value. + pub value: num_bigint::BigInt, } -/// Get assets by stake address query. -pub(crate) struct GetAssetsByStakeAddressQuery; - impl GetAssetsByStakeAddressQuery { /// Prepares a get assets by stake address query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { @@ -84,11 +73,11 @@ impl GetAssetsByStakeAddressQuery { /// Executes a get assets by stake address query. pub(crate) async fn execute( session: &CassandraSession, params: GetAssetsByStakeAddressParams, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let iter = session .execute_iter(PreparedSelectQuery::AssetsByStakeAddress, params) .await? - .into_typed::(); + .rows_stream::()?; Ok(iter) } diff --git a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txi_by_txn_hash.rs b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txi_by_txn_hash.rs index 3da06a7315a..0bcd368d9c5 100644 --- a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txi_by_txn_hash.rs +++ b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txi_by_txn_hash.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowIterator, SerializeRow, - Session, + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, DeserializeRow, + SerializeRow, Session, }; use tracing::error; @@ -30,26 +30,16 @@ impl GetTxiByTxnHashesQueryParams { } } -/// Get TXI Query Result -// TODO: https://github.com/input-output-hk/catalyst-voices/issues/828 -// The macro uses expect to signal an error in deserializing values. -#[allow(clippy::expect_used)] -mod result { - use scylla::FromRow; - - /// Get TXI query result. - #[derive(FromRow)] - pub(crate) struct GetTxiByTxnHashesQuery { - /// TXI transaction hash. - pub txn_hash: Vec, - /// TXI original TXO index. - pub txo: i16, - /// TXI slot number. - pub slot_no: num_bigint::BigInt, - } -} /// Get TXI query. -pub(crate) struct GetTxiByTxnHashesQuery; +#[derive(DeserializeRow)] +pub(crate) struct GetTxiByTxnHashesQuery { + /// TXI transaction hash. + pub txn_hash: Vec, + /// TXI original TXO index. + pub txo: i16, + /// TXI slot number. + pub slot_no: num_bigint::BigInt, +} impl GetTxiByTxnHashesQuery { /// Prepares a get txi query. @@ -72,11 +62,11 @@ impl GetTxiByTxnHashesQuery { /// Executes a get txi by transaction hashes query. pub(crate) async fn execute( session: &CassandraSession, params: GetTxiByTxnHashesQueryParams, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let iter = session .execute_iter(PreparedSelectQuery::TxiByTransactionHash, params) .await? - .into_typed::(); + .rows_stream::()?; Ok(iter) } diff --git a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txo_by_stake_address.rs b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txo_by_stake_address.rs index 1d488f9b283..9e06f64b542 100644 --- a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txo_by_stake_address.rs +++ b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txo_by_stake_address.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowIterator, SerializeRow, - Session, + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, DeserializeRow, + SerializeRow, Session, }; use tracing::error; @@ -34,34 +34,23 @@ impl GetTxoByStakeAddressQueryParams { } } -/// Get TXO by stake address query row result -// TODO: https://github.com/input-output-hk/catalyst-voices/issues/828 -// The macro uses expect to signal an error in deserializing values. -#[allow(clippy::expect_used)] -mod result { - use scylla::FromRow; - - /// Get txo by stake address query result. - #[derive(FromRow)] - pub(crate) struct GetTxoByStakeAddressQuery { - /// TXO transaction hash. - pub txn_hash: Vec, - /// TXO transaction index within the slot. - pub txn: i16, - /// TXO index. - pub txo: i16, - /// TXO transaction slot number. - pub slot_no: num_bigint::BigInt, - /// TXO value. - pub value: num_bigint::BigInt, - /// TXO spent slot. - pub spent_slot: Option, - } +/// Get txo by stake address query. +#[derive(DeserializeRow)] +pub(crate) struct GetTxoByStakeAddressQuery { + /// TXO transaction hash. + pub txn_hash: Vec, + /// TXO transaction index within the slot. + pub txn: i16, + /// TXO index. + pub txo: i16, + /// TXO transaction slot number. + pub slot_no: num_bigint::BigInt, + /// TXO value. + pub value: num_bigint::BigInt, + /// TXO spent slot. + pub spent_slot: Option, } -/// Get staked ADA query. -pub(crate) struct GetTxoByStakeAddressQuery; - impl GetTxoByStakeAddressQuery { /// Prepares a get txo by stake address query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { @@ -83,11 +72,11 @@ impl GetTxoByStakeAddressQuery { /// Executes a get txo by stake address query. pub(crate) async fn execute( session: &CassandraSession, params: GetTxoByStakeAddressQueryParams, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let iter = session .execute_iter(PreparedSelectQuery::TxoByStakeAddress, params) .await? - .into_typed::(); + .rows_stream::()?; Ok(iter) } diff --git a/catalyst-gateway/bin/src/db/index/queries/sync_status/get.rs b/catalyst-gateway/bin/src/db/index/queries/sync_status/get.rs index 804192462b3..2371a1378cf 100644 --- a/catalyst-gateway/bin/src/db/index/queries/sync_status/get.rs +++ b/catalyst-gateway/bin/src/db/index/queries/sync_status/get.rs @@ -1,6 +1,6 @@ //! Get Sync Status query -use futures::StreamExt; +use futures::stream::StreamExt; use tracing::{debug, warn}; use super::update::row::SyncStatusQueryParams; @@ -96,7 +96,15 @@ pub(crate) async fn get_sync_status() -> Vec { let session = session.get_raw_session(); let mut results = match session.query_iter(GET_SYNC_STATUS, ()).await { - Ok(result) => result.into_typed::(), + Ok(result) => { + match result.rows_stream::() { + Ok(result) => result, + Err(err) => { + warn!(error=%err, "Failed to get sync status results from query."); + return synced_chunks; + }, + } + }, Err(err) => { warn!(error=%err, "Failed to get sync status results from query."); return synced_chunks; diff --git a/catalyst-gateway/bin/src/db/index/queries/sync_status/update.rs b/catalyst-gateway/bin/src/db/index/queries/sync_status/update.rs index 03563d2f711..9780318348a 100644 --- a/catalyst-gateway/bin/src/db/index/queries/sync_status/update.rs +++ b/catalyst-gateway/bin/src/db/index/queries/sync_status/update.rs @@ -20,12 +20,11 @@ use crate::{ const INSERT_SYNC_STATUS_QUERY: &str = include_str!("../cql/insert_sync_status.cql"); /// Sync Status Row Record Module -#[allow(clippy::expect_used)] pub(super) mod row { - use scylla::{frame::value::CqlTimestamp, FromRow, SerializeRow}; + use scylla::{frame::value::CqlTimestamp, DeserializeRow, SerializeRow}; /// Sync Status Record Row (used for both Insert and Query response) - #[derive(SerializeRow, FromRow, Debug)] + #[derive(SerializeRow, DeserializeRow, Debug)] pub(crate) struct SyncStatusQueryParams { /// End Slot. pub(crate) end_slot: num_bigint::BigInt, diff --git a/catalyst-gateway/bin/src/db/index/session.rs b/catalyst-gateway/bin/src/db/index/session.rs index 2f29c321eb5..6193a628dac 100644 --- a/catalyst-gateway/bin/src/db/index/session.rs +++ b/catalyst-gateway/bin/src/db/index/session.rs @@ -9,7 +9,7 @@ use std::{ use openssl::ssl::{SslContextBuilder, SslFiletype, SslMethod, SslVerifyMode}; use scylla::{ - frame::Compression, serialize::row::SerializeRow, transport::iterator::RowIterator, + frame::Compression, serialize::row::SerializeRow, transport::iterator::QueryPager, ExecutionProfile, Session, SessionBuilder, }; use tokio::fs; @@ -111,7 +111,7 @@ impl CassandraSession { /// returns. pub(crate) async fn execute_iter

( &self, select_query: PreparedSelectQuery, params: P, - ) -> anyhow::Result + ) -> anyhow::Result where P: SerializeRow { let session = self.session.clone(); let queries = self.queries.clone(); @@ -161,7 +161,7 @@ fn make_execution_profile(_cfg: &cassandra_db::EnvVars) -> ExecutionProfile { ExecutionProfile::builder() .consistency(scylla::statement::Consistency::LocalQuorum) .serial_consistency(Some(scylla::statement::SerialConsistency::LocalSerial)) - .retry_policy(Box::new(scylla::retry_policy::DefaultRetryPolicy::new())) + .retry_policy(Arc::new(scylla::retry_policy::DefaultRetryPolicy::new())) .load_balancing_policy( scylla::load_balancing::DefaultPolicyBuilder::new() .permit_dc_failover(true) diff --git a/catalyst-gateway/bin/src/service/api/cardano/rbac/chain_root_get.rs b/catalyst-gateway/bin/src/service/api/cardano/rbac/chain_root_get.rs index 42e7de091b0..cd264b1c839 100644 --- a/catalyst-gateway/bin/src/service/api/cardano/rbac/chain_root_get.rs +++ b/catalyst-gateway/bin/src/service/api/cardano/rbac/chain_root_get.rs @@ -1,7 +1,7 @@ //! Implementation of the GET `/rbac/chain_root` endpoint. use anyhow::anyhow; use der_parser::asn1_rs::ToDer; -use futures::StreamExt as _; +use futures::StreamExt; use poem_openapi::{payload::Json, ApiResponse, Object}; use tracing::error; diff --git a/catalyst-gateway/bin/src/service/api/cardano/rbac/registrations_get.rs b/catalyst-gateway/bin/src/service/api/cardano/rbac/registrations_get.rs index 1250e09e502..96d2d9db9e3 100644 --- a/catalyst-gateway/bin/src/service/api/cardano/rbac/registrations_get.rs +++ b/catalyst-gateway/bin/src/service/api/cardano/rbac/registrations_get.rs @@ -1,6 +1,6 @@ //! Implementation of the GET `/rbac/registrations` endpoint. use anyhow::anyhow; -use futures::StreamExt as _; +use futures::StreamExt; use poem_openapi::{payload::Json, ApiResponse, Object}; use tracing::error; diff --git a/catalyst-gateway/bin/src/service/api/cardano/rbac/role0_chain_root_get.rs b/catalyst-gateway/bin/src/service/api/cardano/rbac/role0_chain_root_get.rs index 0412734ee3a..ab8ef4582bd 100644 --- a/catalyst-gateway/bin/src/service/api/cardano/rbac/role0_chain_root_get.rs +++ b/catalyst-gateway/bin/src/service/api/cardano/rbac/role0_chain_root_get.rs @@ -1,6 +1,6 @@ //! Implementation of the GET `/rbac/role0_chain_root` endpoint. use anyhow::anyhow; -use futures::StreamExt as _; +use futures::StreamExt; use poem_openapi::{payload::Json, ApiResponse, Object}; use tracing::error;