Skip to content

Commit

Permalink
change: ETCM-7811 native token data source (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
AmbientTea authored Aug 22, 2024
1 parent b668d84 commit ee79311
Show file tree
Hide file tree
Showing 19 changed files with 671 additions and 8 deletions.
1 change: 1 addition & 0 deletions mainchain-follower/db-sync-follower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ ctor = "0.2.5"
default = []
block-source = ["main-chain-follower-api/block-source"]
candidate-source = ["main-chain-follower-api/candidate-source"]
native-token = ["main-chain-follower-api/native-token"]
68 changes: 67 additions & 1 deletion mainchain-follower/db-sync-follower/src/db_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ impl Asset {
#[derive(Debug, Clone, sqlx::FromRow, PartialEq)]
pub(crate) struct AssetName(pub Vec<u8>);

impl From<sidechain_domain::AssetName> for AssetName {
fn from(name: sidechain_domain::AssetName) -> Self {
Self(name.0.to_vec())
}
}

#[derive(Debug, Clone, sqlx::FromRow, PartialEq)]
pub(crate) struct DistributedSetData {
pub utxo_id_tx_hash: [u8; 32],
Expand Down Expand Up @@ -198,6 +204,12 @@ pub(crate) struct MintAction {
#[derive(Debug, Clone, sqlx::FromRow, PartialEq)]
pub(crate) struct PolicyId(pub Vec<u8>);

impl From<sidechain_domain::PolicyId> for PolicyId {
fn from(id: sidechain_domain::PolicyId) -> Self {
Self(id.0.to_vec())
}
}

#[derive(Debug, Clone, sqlx::FromRow, PartialEq)]
pub(crate) struct StakePoolEntry {
pub pool_hash: [u8; 28],
Expand Down Expand Up @@ -260,6 +272,28 @@ impl<'r> Decode<'r, Postgres> for StakeDelegation {
}
}

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct NativeTokenAmount(pub u128);
impl From<NativeTokenAmount> for sidechain_domain::NativeTokenAmount {
fn from(value: NativeTokenAmount) -> Self {
Self(value.0)
}
}

impl sqlx::Type<Postgres> for NativeTokenAmount {
fn type_info() -> <Postgres as sqlx::Database>::TypeInfo {
PgTypeInfo::with_name("NUMERIC")
}
}

impl<'r> Decode<'r, Postgres> for NativeTokenAmount {
fn decode(value: <Postgres as HasValueRef<'r>>::ValueRef) -> Result<Self, BoxDynError> {
let decoded = <sqlx::types::BigDecimal as Decode<Postgres>>::decode(value)?;
let i = decoded.to_u128().ok_or("NativeTokenQuantity is always a u128".to_string())?;
Ok(Self(i))
}
}

#[derive(Debug, Clone, sqlx::FromRow, PartialEq)]
pub(crate) struct TxPosition {
pub block_number: BlockNumber,
Expand Down Expand Up @@ -352,7 +386,7 @@ LIMIT 1",
}

/// Query to get the block by its hash
#[cfg(feature = "block-source")]
#[cfg(any(feature = "block-source", feature = "native-token"))]
pub(crate) async fn get_block_by_hash(
pool: &Pool<Postgres>,
hash: McBlockHash,
Expand Down Expand Up @@ -527,3 +561,35 @@ pub(crate) async fn index_exists(pool: &Pool<Postgres>, index_name: &str) -> boo
.map(|rows| rows.len() == 1)
.unwrap()
}

#[cfg(feature = "native-token")]
pub(crate) async fn get_total_native_tokens_transfered(
pool: &Pool<Postgres>,
after_slot: SlotNumber,
to_slot: SlotNumber,
asset: Asset,
illiquid_supply_address: Address,
) -> Result<Option<NativeTokenAmount>, SqlxError> {
let query = sqlx::query_as::<_, (Option<NativeTokenAmount>,)>(
"
SELECT
SUM(ma_tx_out.quantity)
FROM tx_out
LEFT JOIN ma_tx_out ON ma_tx_out.tx_out_id = tx_out.id
LEFT JOIN multi_asset ON multi_asset.id = ma_tx_out.ident
INNER JOIN tx ON tx_out.tx_id = tx.id
INNER JOIN block ON tx.block_id = block.id
WHERE address = $1
AND multi_asset.policy = $2
AND multi_asset.name = $3
AND $4 < block.slot_no AND block.slot_no <= $5;
",
)
.bind(&illiquid_supply_address.0)
.bind(&asset.policy_id.0)
.bind(&asset.asset_name.0)
.bind(after_slot)
.bind(to_slot);

Ok(query.fetch_one(pool).await?.0)
}
2 changes: 2 additions & 0 deletions mainchain-follower/db-sync-follower/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub mod metrics;
pub mod block;
#[cfg(feature = "candidate-source")]
pub mod candidates;
#[cfg(feature = "native-token")]
pub mod native_token;

pub struct SqlxError(sqlx::Error);

Expand Down
72 changes: 72 additions & 0 deletions mainchain-follower/db-sync-follower/src/native_token/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use crate::db_model::{Address, NativeTokenAmount, SlotNumber};
use crate::metrics::McFollowerMetrics;
use crate::observed_async_trait;
use async_trait::async_trait;
use main_chain_follower_api::{DataSourceError, NativeTokenManagementDataSource, Result};
use sidechain_domain::*;
use sqlx::PgPool;

#[cfg(test)]
mod tests;

pub struct NativeTokenManagementDataSourceImpl {
pub pool: PgPool,
pub metrics_opt: Option<McFollowerMetrics>,
}

observed_async_trait!(
impl NativeTokenManagementDataSource for NativeTokenManagementDataSourceImpl {
async fn get_total_native_token_transfer(
&self,
after_block: Option<McBlockHash>,
to_block: McBlockHash,
native_token_policy_id: PolicyId,
native_token_asset_name: AssetName,
illiquid_supply_address: MainchainAddress,
) -> Result<sidechain_domain::NativeTokenAmount> {
if after_block == Some(to_block.clone()) {
return Ok(NativeTokenAmount(0).into());
}

let (after_slot , to_slot) = futures::try_join!(
get_after_slot(after_block, &self.pool),
get_to_slot(to_block, &self.pool)
)?;

let total_transfer = crate::db_model::get_total_native_tokens_transfered(
&self.pool,
after_slot,
to_slot,
crate::db_model::Asset {
policy_id: native_token_policy_id.into(),
asset_name: native_token_asset_name.into(),
},
Address(illiquid_supply_address.to_string()),
)
.await?;

Ok(total_transfer.unwrap_or(NativeTokenAmount(0)).into())
}
}
);

async fn get_after_slot(after_block: Option<McBlockHash>, pool: &PgPool) -> Result<SlotNumber> {
match after_block {
None => Ok(SlotNumber(0)),
Some(after_block) => Ok(crate::db_model::get_block_by_hash(pool, after_block.clone())
.await?
.ok_or(DataSourceError::ExpectedDataNotFound(format!(
"Lower bound block {after_block} not found when querying for native token transfers"
)))?
.slot_no),
}
}

async fn get_to_slot(to_block: McBlockHash, pool: &PgPool) -> Result<SlotNumber> {
Ok(crate::db_model::get_block_by_hash(pool, to_block.clone())
.await?
.ok_or(DataSourceError::ExpectedDataNotFound(format!(
"Upper bound block {to_block} not found when querying for native token transfers"
)))?
.slot_no)
}
90 changes: 90 additions & 0 deletions mainchain-follower/db-sync-follower/src/native_token/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use super::NativeTokenManagementDataSourceImpl;
use main_chain_follower_api::NativeTokenManagementDataSource;
use sidechain_domain::{AssetName, MainchainAddress, McBlockHash, PolicyId};
use sqlx::PgPool;
use std::str::FromStr;

fn native_token_policy_id() -> PolicyId {
PolicyId::from_hex_unsafe("6c969320597b755454ff3653ad09725d590c570827a129aeb4385526")
}

fn native_token_asset_name() -> AssetName {
AssetName::from_hex_unsafe("546573744275647a507265766965775f3335")
}

fn illiquid_supply_address() -> MainchainAddress {
MainchainAddress::from_str("addr_test1wrhvtvx3f0g9wv9rx8kfqc60jva3e07nqujk2cspekv4mqs9rjdvz")
.unwrap()
}

fn block_hash(i: u32) -> McBlockHash {
McBlockHash::from_str(&format!(
"b00000000000000000000000000000000000000000000000000000000000000{i}"
))
.unwrap()
}

pub fn genesis_hash() -> McBlockHash {
block_hash(0)
}

fn make_source(pool: PgPool) -> NativeTokenManagementDataSourceImpl {
NativeTokenManagementDataSourceImpl { pool, metrics_opt: None }
}

#[sqlx::test(migrations = "./testdata/native-token/migrations")]
async fn defaults_to_zero_when_there_are_no_transfers(pool: PgPool) {
let source = make_source(pool);
let after_block = None;
let to_block = genesis_hash();
let result = source
.get_total_native_token_transfer(
after_block,
to_block,
native_token_policy_id(),
native_token_asset_name(),
illiquid_supply_address(),
)
.await
.unwrap();

assert_eq!(result.0, 0)
}

#[sqlx::test(migrations = "./testdata/native-token/migrations")]
async fn gets_sum_of_all_transfers_when_queried_up_to_latest_block(pool: PgPool) {
let source = make_source(pool);
let after_block = None;
let to_block = block_hash(5);
let result = source
.get_total_native_token_transfer(
after_block,
to_block,
native_token_policy_id(),
native_token_asset_name(),
illiquid_supply_address(),
)
.await
.unwrap();

assert_eq!(result.0, 11 + 12 + 13 + 14)
}

#[sqlx::test(migrations = "./testdata/native-token/migrations")]
async fn gets_sum_of_transfers_in_range(pool: PgPool) {
let source = make_source(pool);
let after_block = Some(block_hash(1));
let to_block = block_hash(5);
let result = source
.get_total_native_token_transfer(
after_block,
to_block,
native_token_policy_id(),
native_token_asset_name(),
illiquid_supply_address(),
)
.await
.unwrap();

assert_eq!(result.0, 12 + 13 + 14)
}
Loading

0 comments on commit ee79311

Please sign in to comment.