diff --git a/src/db/cache/db_cache.rs b/src/db/cache/db_cache.rs index 86dc5ff..6c48508 100644 --- a/src/db/cache/db_cache.rs +++ b/src/db/cache/db_cache.rs @@ -37,7 +37,7 @@ impl DbCache { /// Insert all data into the DB and clear cache. pub async fn flush(&mut self, db_tx: &mut Transaction<'_>, ctx: &Context) { - try_info!(ctx, "Flushing cache"); + try_info!(ctx, "Flushing DB cache..."); if self.runes.len() > 0 { try_debug!(ctx, "Flushing {} runes", self.runes.len()); let _ = pg_insert_runes(&self.runes, db_tx, ctx).await; diff --git a/src/db/cache/index_cache.rs b/src/db/cache/index_cache.rs index 8c5b8a6..35a33e7 100644 --- a/src/db/cache/index_cache.rs +++ b/src/db/cache/index_cache.rs @@ -1,8 +1,4 @@ -use std::{ - collections::{HashMap, VecDeque}, - num::NonZeroUsize, - str::FromStr, -}; +use std::{collections::HashMap, num::NonZeroUsize, str::FromStr}; use bitcoin::Network; use chainhook_sdk::{ @@ -16,13 +12,13 @@ use tokio_postgres::{Client, Transaction}; use crate::{ config::Config, db::{ + cache::utils::input_rune_balances_from_tx_inputs, models::{ db_balance_change::DbBalanceChange, db_ledger_entry::DbLedgerEntry, db_ledger_operation::DbLedgerOperation, db_rune::DbRune, db_supply_change::DbSupplyChange, }, - pg_get_input_rune_balances, pg_get_max_rune_number, pg_get_rune_by_id, - pg_get_rune_total_mints, + pg_get_max_rune_number, pg_get_rune_by_id, pg_get_rune_total_mints, }, try_debug, try_info, try_warn, }; @@ -30,6 +26,7 @@ use crate::{ use super::{ db_cache::DbCache, transaction_cache::{InputRuneBalance, TransactionCache}, + utils::move_tx_output_cache_to_output_cache, }; /// Holds rune data across multiple blocks for faster computations. Processes rune events as they happen during transactions and @@ -44,6 +41,9 @@ pub struct IndexCache { rune_total_mints_cache: LruCache, /// LRU cache for outputs with rune balances. output_cache: LruCache<(String, u32), HashMap>>, + /// Same as above but only for the current transaction. We use a `HashMap` instead of an LRU cache to make sure we keep all + /// outputs in memory while we index this transaction. Must be cleared every time a new transaction is processed. + tx_output_cache: HashMap<(String, u32), HashMap>>, /// Holds a single transaction's rune cache. Must be cleared every time a new transaction is processed. tx_cache: TransactionCache, /// Keeps rows that have not yet been inserted in the DB. @@ -60,6 +60,7 @@ impl IndexCache { rune_cache: LruCache::new(cap), rune_total_mints_cache: LruCache::new(cap), output_cache: LruCache::new(cap), + tx_output_cache: HashMap::new(), tx_cache: TransactionCache::new(network, &"".to_string(), 1, 0, &"".to_string(), 0), db_cache: DbCache::new(), } @@ -86,12 +87,14 @@ impl IndexCache { tx_id, timestamp, ); + self.tx_output_cache.clear(); } /// Finalizes the current transaction index cache. pub fn end_transaction(&mut self, _db_tx: &mut Transaction<'_>, ctx: &Context) { let entries = self.tx_cache.allocate_remaining_balances(ctx); self.add_ledger_entries_to_db_cache(&entries); + move_tx_output_cache_to_output_cache(&mut self.tx_output_cache, &mut self.output_cache); } pub async fn apply_runestone( @@ -103,7 +106,15 @@ impl IndexCache { ctx: &Context, ) { try_debug!(ctx, "{:?} {}", runestone, self.tx_cache.location); - self.scan_tx_input_rune_balance(tx_inputs, db_tx, ctx).await; + let input_balances = input_rune_balances_from_tx_inputs( + tx_inputs, + &self.tx_output_cache, + &mut self.output_cache, + db_tx, + ctx, + ) + .await; + self.tx_cache.set_input_rune_balances(input_balances, ctx); self.tx_cache .apply_runestone_pointer(runestone, tx_outputs, ctx); } @@ -116,7 +127,15 @@ impl IndexCache { ctx: &Context, ) { try_debug!(ctx, "{:?} {}", cenotaph, self.tx_cache.location); - self.scan_tx_input_rune_balance(tx_inputs, db_tx, ctx).await; + let input_balances = input_rune_balances_from_tx_inputs( + tx_inputs, + &self.tx_output_cache, + &mut self.output_cache, + db_tx, + ctx, + ) + .await; + self.tx_cache.set_input_rune_balances(input_balances, ctx); let entries = self.tx_cache.apply_cenotaph_input_burn(cenotaph); self.add_ledger_entries_to_db_cache(&entries); } @@ -298,53 +317,6 @@ impl IndexCache { return Some(total); } - /// Takes all transaction inputs and transform them into rune balances to be allocated. - async fn scan_tx_input_rune_balance( - &mut self, - tx_inputs: &Vec, - db_tx: &mut Transaction<'_>, - ctx: &Context, - ) { - // Maps input index to all of its rune balances. Useful in order to keep rune inputs in order. - let mut indexed_input_runes = HashMap::new(); - - // Look in memory cache. - let mut cache_misses = vec![]; - for (i, input) in tx_inputs.iter().enumerate() { - let tx_id = input.previous_output.txid.hash[2..].to_string(); - let vout = input.previous_output.vout; - if let Some(map) = self.output_cache.get(&(tx_id.clone(), vout)) { - indexed_input_runes.insert(i as u32, map.clone()); - } else { - cache_misses.push((i as u32, tx_id, vout)); - } - } - - // Look for cache misses in database. - if cache_misses.len() > 0 { - // self.db_cache.flush(db_tx, ctx).await; - let output_balances = pg_get_input_rune_balances(cache_misses, db_tx, ctx).await; - indexed_input_runes.extend(output_balances); - } - - let mut final_input_runes: HashMap> = HashMap::new(); - let mut input_keys: Vec = indexed_input_runes.keys().copied().collect(); - input_keys.sort(); - for key in input_keys.iter() { - let input_value = indexed_input_runes.get(key).unwrap(); - for (rune_id, vec) in input_value.iter() { - if let Some(rune) = final_input_runes.get_mut(rune_id) { - rune.extend(vec.clone()); - } else { - final_input_runes.insert(*rune_id, VecDeque::from(vec.clone())); - } - } - } - - self.tx_cache - .set_input_rune_balances(final_input_runes, ctx); - } - /// Take ledger entries returned by the `TransactionCache` and add them to the `DbCache`. Update global balances and counters /// as well. fn add_ledger_entries_to_db_cache(&mut self, entries: &Vec) { @@ -435,25 +407,23 @@ impl IndexCache { address, entry.amount.unwrap(), )); - } - - // Add to output LRU cache if it's received balance. - let k = (entry.tx_id.clone(), entry.output.unwrap().0); - let rune_id = RuneId::from_str(entry.rune_id.as_str()).unwrap(); - let balance = InputRuneBalance { - address: entry.address.clone(), - amount: entry.amount.unwrap().0, - }; - if let Some(v) = self.output_cache.get_mut(&k) { - if let Some(rune_balance) = v.get_mut(&rune_id) { - rune_balance.push(balance); - } else { - v.insert(rune_id, vec![balance]); - } - } else { - let mut v = HashMap::new(); - v.insert(rune_id, vec![balance]); - self.output_cache.push(k, v); + // Add to current transaction's output cache if it's received balance. + let k = (entry.tx_id.clone(), entry.output.unwrap().0); + let rune_id = RuneId::from_str(entry.rune_id.as_str()).unwrap(); + let balance = InputRuneBalance { + address: entry.address.clone(), + amount: entry.amount.unwrap().0, + }; + let mut default = HashMap::new(); + default.insert(rune_id, vec![balance.clone()]); + self.tx_output_cache + .entry(k) + .and_modify(|i| { + i.entry(rune_id) + .and_modify(|v| v.push(balance.clone())) + .or_insert(vec![balance]); + }) + .or_insert(default); } } } diff --git a/src/db/cache/mod.rs b/src/db/cache/mod.rs index ad6482c..d09a35a 100644 --- a/src/db/cache/mod.rs +++ b/src/db/cache/mod.rs @@ -2,3 +2,4 @@ pub mod db_cache; pub mod index_cache; pub mod transaction_cache; pub mod transaction_location; +pub mod utils; diff --git a/src/db/cache/utils.rs b/src/db/cache/utils.rs new file mode 100644 index 0000000..6add664 --- /dev/null +++ b/src/db/cache/utils.rs @@ -0,0 +1,134 @@ +use std::collections::{HashMap, VecDeque}; + +use chainhook_sdk::{types::bitcoin::TxIn, utils::Context}; +use lru::LruCache; +use ordinals::RuneId; +use tokio_postgres::Transaction; + +use crate::db::pg_get_input_rune_balances; + +use super::transaction_cache::InputRuneBalance; + +/// Takes all transaction inputs and transforms them into rune balances to be allocated for operations. Looks inside an output LRU +/// cache and the DB when there are cache misses. +pub async fn input_rune_balances_from_tx_inputs( + tx_inputs: &Vec, + tx_output_cache: &HashMap<(String, u32), HashMap>>, + output_cache: &mut LruCache<(String, u32), HashMap>>, + db_tx: &mut Transaction<'_>, + ctx: &Context, +) -> HashMap> { + // Maps input index to all of its rune balances. Useful in order to keep rune inputs in order. + let mut indexed_input_runes = HashMap::new(); + let mut cache_misses = vec![]; + + // Look in both current transaction output cache and in long term LRU cache. + for (i, input) in tx_inputs.iter().enumerate() { + let tx_id = input.previous_output.txid.hash[2..].to_string(); + let vout = input.previous_output.vout; + let k = (tx_id.clone(), vout); + if let Some(map) = tx_output_cache.get(&k) { + indexed_input_runes.insert(i as u32, map.clone()); + } else if let Some(map) = output_cache.get(&k) { + indexed_input_runes.insert(i as u32, map.clone()); + } else { + cache_misses.push((i as u32, tx_id, vout)); + } + } + // Look for cache misses in database. We don't need to `flush` the DB cache here because we've already looked in the current + // transaction's output cache. + if cache_misses.len() > 0 { + let output_balances = pg_get_input_rune_balances(cache_misses, db_tx, ctx).await; + indexed_input_runes.extend(output_balances); + } + + let mut final_input_runes: HashMap> = HashMap::new(); + let mut input_keys: Vec = indexed_input_runes.keys().copied().collect(); + input_keys.sort(); + for key in input_keys.iter() { + let input_value = indexed_input_runes.get(key).unwrap(); + for (rune_id, vec) in input_value.iter() { + if let Some(rune) = final_input_runes.get_mut(rune_id) { + rune.extend(vec.clone()); + } else { + final_input_runes.insert(*rune_id, VecDeque::from(vec.clone())); + } + } + } + final_input_runes +} + +/// Moves data from the current transaction's output cache to the long-term LRU output cache. Clears the tx output cache when +/// done. +pub fn move_tx_output_cache_to_output_cache( + tx_output_cache: &mut HashMap<(String, u32), HashMap>>, + output_cache: &mut LruCache<(String, u32), HashMap>>, +) { + for (k, tx_output_map) in tx_output_cache.iter() { + if let Some(v) = output_cache.get_mut(&k) { + for (rune_id, balances) in tx_output_map.iter() { + if let Some(rune_balance) = v.get_mut(&rune_id) { + rune_balance.extend(balances.clone()); + } else { + v.insert(*rune_id, balances.clone()); + } + } + } else { + output_cache.push(k.clone(), tx_output_map.clone()); + } + } + tx_output_cache.clear(); +} + +#[cfg(test)] +mod test { + // use std::{collections::HashMap, num::NonZeroUsize, str::FromStr}; + + // use chainhook_sdk::{ + // types::{ + // bitcoin::{OutPoint, TxIn}, + // TransactionIdentifier, + // }, + // utils::Context, + // }; + // use lru::LruCache; + // use ordinals::RuneId; + + // use crate::db::cache::transaction_cache::InputRuneBalance; + + // #[test] + // fn from_output_cache() { + // let tx_inputs = vec![TxIn { + // previous_output: OutPoint { + // txid: TransactionIdentifier { + // hash: "aea76e5ef8135851d0387074cf7672013779e4506e56122e0e698e12ede62681" + // .to_string(), + // }, + // vout: 2, + // value: 100, + // block_height: 848300, + // }, + // script_sig: "".to_string(), + // sequence: 1, + // witness: vec![], + // }]; + // let mut value = HashMap::new(); + // value.insert( + // RuneId::from_str("840000:1").unwrap(), + // vec![InputRuneBalance { + // address: Some("1EDYZPvGqKzZYp6DoTtcgXwvSAkA9d9UKU".to_string()), + // amount: 10000, + // }], + // ); + // let mut output_cache: LruCache<(String, u32), HashMap>> = + // LruCache::new(NonZeroUsize::new(2).unwrap()); + // output_cache.put( + // ( + // "aea76e5ef8135851d0387074cf7672013779e4506e56122e0e698e12ede62681".to_string(), + // 2, + // ), + // value, + // ); + // let ctx = Context::empty(); + // } +} diff --git a/src/db/index.rs b/src/db/index.rs index 7f17596..f578b7e 100644 --- a/src/db/index.rs +++ b/src/db/index.rs @@ -1,15 +1,8 @@ -use std::str::FromStr; - use bitcoin::absolute::LockTime; use bitcoin::transaction::TxOut; use bitcoin::Network; -use bitcoin::OutPoint; use bitcoin::ScriptBuf; -use bitcoin::Sequence; use bitcoin::Transaction; -use bitcoin::TxIn; -use bitcoin::Txid; -use bitcoin::Witness; use chainhook_sdk::types::BitcoinTransactionData; use chainhook_sdk::{types::BitcoinBlockData, utils::Context}; use ordinals::Artifact; @@ -39,20 +32,8 @@ fn bitcoin_tx_from_chainhook_tx( Transaction { version: 2, lock_time: LockTime::from_time(block.timestamp).unwrap(), - input: tx - .metadata - .inputs - .iter() - .map(|input| TxIn { - previous_output: OutPoint { - txid: Txid::from_str(&input.previous_output.txid.hash[2..]).unwrap(), - vout: input.previous_output.vout, - }, - script_sig: ScriptBuf::from_bytes(hex::decode(&input.script_sig[2..]).unwrap()), - sequence: Sequence(input.sequence), - witness: Witness::new(), // We don't need this for runes - }) - .collect(), + // Inputs don't matter for Runestone parsing. + input: vec![], output: tx .metadata .outputs