Skip to content

Commit

Permalink
fix: cached output balance retrieval (#16)
Browse files Browse the repository at this point in the history
* fix: optimize tx output cache

* chore: style
  • Loading branch information
rafaelcr authored Jul 5, 2024
1 parent 3c93671 commit b87b921
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 97 deletions.
2 changes: 1 addition & 1 deletion src/db/cache/db_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl DbCache {

/// Insert all data into the DB and clear cache.
pub async fn flush(&mut self, db_tx: &mut Transaction<'_>, ctx: &Context) {
try_info!(ctx, "Flushing cache");
try_info!(ctx, "Flushing DB cache...");
if self.runes.len() > 0 {
try_debug!(ctx, "Flushing {} runes", self.runes.len());
let _ = pg_insert_runes(&self.runes, db_tx, ctx).await;
Expand Down
120 changes: 45 additions & 75 deletions src/db/cache/index_cache.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::{
collections::{HashMap, VecDeque},
num::NonZeroUsize,
str::FromStr,
};
use std::{collections::HashMap, num::NonZeroUsize, str::FromStr};

use bitcoin::Network;
use chainhook_sdk::{
Expand All @@ -16,20 +12,21 @@ use tokio_postgres::{Client, Transaction};
use crate::{
config::Config,
db::{
cache::utils::input_rune_balances_from_tx_inputs,
models::{
db_balance_change::DbBalanceChange, db_ledger_entry::DbLedgerEntry,
db_ledger_operation::DbLedgerOperation, db_rune::DbRune,
db_supply_change::DbSupplyChange,
},
pg_get_input_rune_balances, pg_get_max_rune_number, pg_get_rune_by_id,
pg_get_rune_total_mints,
pg_get_max_rune_number, pg_get_rune_by_id, pg_get_rune_total_mints,
},
try_debug, try_info, try_warn,
};

use super::{
db_cache::DbCache,
transaction_cache::{InputRuneBalance, TransactionCache},
utils::move_tx_output_cache_to_output_cache,
};

/// Holds rune data across multiple blocks for faster computations. Processes rune events as they happen during transactions and
Expand All @@ -44,6 +41,9 @@ pub struct IndexCache {
rune_total_mints_cache: LruCache<RuneId, u128>,
/// LRU cache for outputs with rune balances.
output_cache: LruCache<(String, u32), HashMap<RuneId, Vec<InputRuneBalance>>>,
/// Same as above but only for the current transaction. We use a `HashMap` instead of an LRU cache to make sure we keep all
/// outputs in memory while we index this transaction. Must be cleared every time a new transaction is processed.
tx_output_cache: HashMap<(String, u32), HashMap<RuneId, Vec<InputRuneBalance>>>,
/// Holds a single transaction's rune cache. Must be cleared every time a new transaction is processed.
tx_cache: TransactionCache,
/// Keeps rows that have not yet been inserted in the DB.
Expand All @@ -60,6 +60,7 @@ impl IndexCache {
rune_cache: LruCache::new(cap),
rune_total_mints_cache: LruCache::new(cap),
output_cache: LruCache::new(cap),
tx_output_cache: HashMap::new(),
tx_cache: TransactionCache::new(network, &"".to_string(), 1, 0, &"".to_string(), 0),
db_cache: DbCache::new(),
}
Expand All @@ -86,12 +87,14 @@ impl IndexCache {
tx_id,
timestamp,
);
self.tx_output_cache.clear();
}

/// Finalizes the current transaction index cache.
pub fn end_transaction(&mut self, _db_tx: &mut Transaction<'_>, ctx: &Context) {
let entries = self.tx_cache.allocate_remaining_balances(ctx);
self.add_ledger_entries_to_db_cache(&entries);
move_tx_output_cache_to_output_cache(&mut self.tx_output_cache, &mut self.output_cache);
}

pub async fn apply_runestone(
Expand All @@ -103,7 +106,15 @@ impl IndexCache {
ctx: &Context,
) {
try_debug!(ctx, "{:?} {}", runestone, self.tx_cache.location);
self.scan_tx_input_rune_balance(tx_inputs, db_tx, ctx).await;
let input_balances = input_rune_balances_from_tx_inputs(
tx_inputs,
&self.tx_output_cache,
&mut self.output_cache,
db_tx,
ctx,
)
.await;
self.tx_cache.set_input_rune_balances(input_balances, ctx);
self.tx_cache
.apply_runestone_pointer(runestone, tx_outputs, ctx);
}
Expand All @@ -116,7 +127,15 @@ impl IndexCache {
ctx: &Context,
) {
try_debug!(ctx, "{:?} {}", cenotaph, self.tx_cache.location);
self.scan_tx_input_rune_balance(tx_inputs, db_tx, ctx).await;
let input_balances = input_rune_balances_from_tx_inputs(
tx_inputs,
&self.tx_output_cache,
&mut self.output_cache,
db_tx,
ctx,
)
.await;
self.tx_cache.set_input_rune_balances(input_balances, ctx);
let entries = self.tx_cache.apply_cenotaph_input_burn(cenotaph);
self.add_ledger_entries_to_db_cache(&entries);
}
Expand Down Expand Up @@ -298,53 +317,6 @@ impl IndexCache {
return Some(total);
}

/// Takes all transaction inputs and transform them into rune balances to be allocated.
async fn scan_tx_input_rune_balance(
&mut self,
tx_inputs: &Vec<TxIn>,
db_tx: &mut Transaction<'_>,
ctx: &Context,
) {
// Maps input index to all of its rune balances. Useful in order to keep rune inputs in order.
let mut indexed_input_runes = HashMap::new();

// Look in memory cache.
let mut cache_misses = vec![];
for (i, input) in tx_inputs.iter().enumerate() {
let tx_id = input.previous_output.txid.hash[2..].to_string();
let vout = input.previous_output.vout;
if let Some(map) = self.output_cache.get(&(tx_id.clone(), vout)) {
indexed_input_runes.insert(i as u32, map.clone());
} else {
cache_misses.push((i as u32, tx_id, vout));
}
}

// Look for cache misses in database.
if cache_misses.len() > 0 {
// self.db_cache.flush(db_tx, ctx).await;
let output_balances = pg_get_input_rune_balances(cache_misses, db_tx, ctx).await;
indexed_input_runes.extend(output_balances);
}

let mut final_input_runes: HashMap<RuneId, VecDeque<InputRuneBalance>> = HashMap::new();
let mut input_keys: Vec<u32> = indexed_input_runes.keys().copied().collect();
input_keys.sort();
for key in input_keys.iter() {
let input_value = indexed_input_runes.get(key).unwrap();
for (rune_id, vec) in input_value.iter() {
if let Some(rune) = final_input_runes.get_mut(rune_id) {
rune.extend(vec.clone());
} else {
final_input_runes.insert(*rune_id, VecDeque::from(vec.clone()));
}
}
}

self.tx_cache
.set_input_rune_balances(final_input_runes, ctx);
}

/// Take ledger entries returned by the `TransactionCache` and add them to the `DbCache`. Update global balances and counters
/// as well.
fn add_ledger_entries_to_db_cache(&mut self, entries: &Vec<DbLedgerEntry>) {
Expand Down Expand Up @@ -435,25 +407,23 @@ impl IndexCache {
address,
entry.amount.unwrap(),
));
}

// Add to output LRU cache if it's received balance.
let k = (entry.tx_id.clone(), entry.output.unwrap().0);
let rune_id = RuneId::from_str(entry.rune_id.as_str()).unwrap();
let balance = InputRuneBalance {
address: entry.address.clone(),
amount: entry.amount.unwrap().0,
};
if let Some(v) = self.output_cache.get_mut(&k) {
if let Some(rune_balance) = v.get_mut(&rune_id) {
rune_balance.push(balance);
} else {
v.insert(rune_id, vec![balance]);
}
} else {
let mut v = HashMap::new();
v.insert(rune_id, vec![balance]);
self.output_cache.push(k, v);
// Add to current transaction's output cache if it's received balance.
let k = (entry.tx_id.clone(), entry.output.unwrap().0);
let rune_id = RuneId::from_str(entry.rune_id.as_str()).unwrap();
let balance = InputRuneBalance {
address: entry.address.clone(),
amount: entry.amount.unwrap().0,
};
let mut default = HashMap::new();
default.insert(rune_id, vec![balance.clone()]);
self.tx_output_cache
.entry(k)
.and_modify(|i| {
i.entry(rune_id)
.and_modify(|v| v.push(balance.clone()))
.or_insert(vec![balance]);
})
.or_insert(default);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/db/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod db_cache;
pub mod index_cache;
pub mod transaction_cache;
pub mod transaction_location;
pub mod utils;
134 changes: 134 additions & 0 deletions src/db/cache/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use std::collections::{HashMap, VecDeque};

use chainhook_sdk::{types::bitcoin::TxIn, utils::Context};
use lru::LruCache;
use ordinals::RuneId;
use tokio_postgres::Transaction;

use crate::db::pg_get_input_rune_balances;

use super::transaction_cache::InputRuneBalance;

/// Takes all transaction inputs and transforms them into rune balances to be allocated for operations. Looks inside an output LRU
/// cache and the DB when there are cache misses.
pub async fn input_rune_balances_from_tx_inputs(
tx_inputs: &Vec<TxIn>,
tx_output_cache: &HashMap<(String, u32), HashMap<RuneId, Vec<InputRuneBalance>>>,
output_cache: &mut LruCache<(String, u32), HashMap<RuneId, Vec<InputRuneBalance>>>,
db_tx: &mut Transaction<'_>,
ctx: &Context,
) -> HashMap<RuneId, VecDeque<InputRuneBalance>> {
// Maps input index to all of its rune balances. Useful in order to keep rune inputs in order.
let mut indexed_input_runes = HashMap::new();
let mut cache_misses = vec![];

// Look in both current transaction output cache and in long term LRU cache.
for (i, input) in tx_inputs.iter().enumerate() {
let tx_id = input.previous_output.txid.hash[2..].to_string();
let vout = input.previous_output.vout;
let k = (tx_id.clone(), vout);
if let Some(map) = tx_output_cache.get(&k) {
indexed_input_runes.insert(i as u32, map.clone());
} else if let Some(map) = output_cache.get(&k) {
indexed_input_runes.insert(i as u32, map.clone());
} else {
cache_misses.push((i as u32, tx_id, vout));
}
}
// Look for cache misses in database. We don't need to `flush` the DB cache here because we've already looked in the current
// transaction's output cache.
if cache_misses.len() > 0 {
let output_balances = pg_get_input_rune_balances(cache_misses, db_tx, ctx).await;
indexed_input_runes.extend(output_balances);
}

let mut final_input_runes: HashMap<RuneId, VecDeque<InputRuneBalance>> = HashMap::new();
let mut input_keys: Vec<u32> = indexed_input_runes.keys().copied().collect();
input_keys.sort();
for key in input_keys.iter() {
let input_value = indexed_input_runes.get(key).unwrap();
for (rune_id, vec) in input_value.iter() {
if let Some(rune) = final_input_runes.get_mut(rune_id) {
rune.extend(vec.clone());
} else {
final_input_runes.insert(*rune_id, VecDeque::from(vec.clone()));
}
}
}
final_input_runes
}

/// Moves data from the current transaction's output cache to the long-term LRU output cache. Clears the tx output cache when
/// done.
pub fn move_tx_output_cache_to_output_cache(
tx_output_cache: &mut HashMap<(String, u32), HashMap<RuneId, Vec<InputRuneBalance>>>,
output_cache: &mut LruCache<(String, u32), HashMap<RuneId, Vec<InputRuneBalance>>>,
) {
for (k, tx_output_map) in tx_output_cache.iter() {
if let Some(v) = output_cache.get_mut(&k) {
for (rune_id, balances) in tx_output_map.iter() {
if let Some(rune_balance) = v.get_mut(&rune_id) {
rune_balance.extend(balances.clone());
} else {
v.insert(*rune_id, balances.clone());
}
}
} else {
output_cache.push(k.clone(), tx_output_map.clone());
}
}
tx_output_cache.clear();
}

#[cfg(test)]
mod test {
// use std::{collections::HashMap, num::NonZeroUsize, str::FromStr};

// use chainhook_sdk::{
// types::{
// bitcoin::{OutPoint, TxIn},
// TransactionIdentifier,
// },
// utils::Context,
// };
// use lru::LruCache;
// use ordinals::RuneId;

// use crate::db::cache::transaction_cache::InputRuneBalance;

// #[test]
// fn from_output_cache() {
// let tx_inputs = vec![TxIn {
// previous_output: OutPoint {
// txid: TransactionIdentifier {
// hash: "aea76e5ef8135851d0387074cf7672013779e4506e56122e0e698e12ede62681"
// .to_string(),
// },
// vout: 2,
// value: 100,
// block_height: 848300,
// },
// script_sig: "".to_string(),
// sequence: 1,
// witness: vec![],
// }];
// let mut value = HashMap::new();
// value.insert(
// RuneId::from_str("840000:1").unwrap(),
// vec![InputRuneBalance {
// address: Some("1EDYZPvGqKzZYp6DoTtcgXwvSAkA9d9UKU".to_string()),
// amount: 10000,
// }],
// );
// let mut output_cache: LruCache<(String, u32), HashMap<RuneId, Vec<InputRuneBalance>>> =
// LruCache::new(NonZeroUsize::new(2).unwrap());
// output_cache.put(
// (
// "aea76e5ef8135851d0387074cf7672013779e4506e56122e0e698e12ede62681".to_string(),
// 2,
// ),
// value,
// );
// let ctx = Context::empty();
// }
}
23 changes: 2 additions & 21 deletions src/db/index.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
use std::str::FromStr;

use bitcoin::absolute::LockTime;
use bitcoin::transaction::TxOut;
use bitcoin::Network;
use bitcoin::OutPoint;
use bitcoin::ScriptBuf;
use bitcoin::Sequence;
use bitcoin::Transaction;
use bitcoin::TxIn;
use bitcoin::Txid;
use bitcoin::Witness;
use chainhook_sdk::types::BitcoinTransactionData;
use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
use ordinals::Artifact;
Expand Down Expand Up @@ -39,20 +32,8 @@ fn bitcoin_tx_from_chainhook_tx(
Transaction {
version: 2,
lock_time: LockTime::from_time(block.timestamp).unwrap(),
input: tx
.metadata
.inputs
.iter()
.map(|input| TxIn {
previous_output: OutPoint {
txid: Txid::from_str(&input.previous_output.txid.hash[2..]).unwrap(),
vout: input.previous_output.vout,
},
script_sig: ScriptBuf::from_bytes(hex::decode(&input.script_sig[2..]).unwrap()),
sequence: Sequence(input.sequence),
witness: Witness::new(), // We don't need this for runes
})
.collect(),
// Inputs don't matter for Runestone parsing.
input: vec![],
output: tx
.metadata
.outputs
Expand Down

0 comments on commit b87b921

Please sign in to comment.