From d1045969b7a7f0b20f1a9f1f6e3e1905bc6a6c1e Mon Sep 17 00:00:00 2001 From: Enzo Cioppettini Date: Fri, 9 Apr 2021 17:17:47 -0300 Subject: [PATCH 1/7] explorer: add in memory stable_storage --- jormungandr/src/explorer/error.rs | 5 + jormungandr/src/explorer/stable_storage.rs | 142 +++++++++++++++++++++ 2 files changed, 147 insertions(+) create mode 100644 jormungandr/src/explorer/stable_storage.rs diff --git a/jormungandr/src/explorer/error.rs b/jormungandr/src/explorer/error.rs index 646fcb8990..83f46bcb6d 100644 --- a/jormungandr/src/explorer/error.rs +++ b/jormungandr/src/explorer/error.rs @@ -1,3 +1,4 @@ +use super::stable_storage::StableIndexError; use crate::blockcfg::HeaderHash; use crate::{blockchain::StorageError, intercom}; use thiserror::Error; @@ -10,6 +11,8 @@ pub enum ExplorerError { AncestorNotFound(HeaderHash), #[error("transaction '{0}' is already indexed")] TransactionAlreadyExists(crate::blockcfg::FragmentId), + #[error("transaction '{0}' not found")] + TransactionNotFound(crate::blockcfg::FragmentId), #[error("tried to index block '{0}' twice")] BlockAlreadyExists(HeaderHash), #[error("block with {0} chain length already exists in explorer branch")] @@ -20,6 +23,8 @@ pub enum ExplorerError { StorageError(#[from] StorageError), #[error("streaming error")] StreamingError(#[from] intercom::Error), + #[error("stable storage error")] + StableIndexError(#[from] StableIndexError), } pub type Result = std::result::Result; diff --git a/jormungandr/src/explorer/stable_storage.rs b/jormungandr/src/explorer/stable_storage.rs new file mode 100644 index 0000000000..143919a71a --- /dev/null +++ b/jormungandr/src/explorer/stable_storage.rs @@ -0,0 +1,142 @@ +use super::indexing::ExplorerAddress; +use super::{EpochData, ExplorerBlock}; +use crate::blockcfg::{ChainLength, Epoch, FragmentId, HeaderHash}; +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; +use thiserror::Error; +use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use tracing::debug; + +#[derive(Error, Debug)] +pub enum StableIndexError { + #[error("block is already indexed in stable explorer storage: {0}")] + BlockAlreadyExists(HeaderHash), + #[error("there is already a block with given chain_length: {0}")] + DuplicatedChainLength(ChainLength), + #[error("transaction is already indexed in stable explorer storage")] + TransactionAlreadyExists, +} + +#[derive(Clone, Default)] +pub struct StableIndexShared(pub Arc>); + +impl StableIndexShared { + pub async fn write<'a>(&'a self) -> RwLockWriteGuard<'a, StableIndex> { + self.0.write().await + } + + pub async fn read<'a>(&'a self) -> RwLockReadGuard<'a, StableIndex> { + self.0.read().await + } +} + +/// in memory non-versioned version of the explorer indexes +/// this is mostly a *naive* version, because the final step would be to have +/// something backed by an on-disk database +/// ideally just reimplementing this would be enough to introduce a database +/// in practice, the api may need to be adapted to use database cursors or some +/// sort of offsets +#[derive(Default)] +pub struct StableIndex { + transactions_by_address: HashMap>, + block_by_chain_length: BTreeMap, + epochs: BTreeMap, + blocks: HashMap, + transaction_to_block: HashMap, +} + +impl StableIndex { + pub fn apply_block(&mut self, block: ExplorerBlock) -> Result<(), StableIndexError> { + debug!("applying block to explorer's stable index {}", block.id()); + + if self + .block_by_chain_length + .insert(block.chain_length, block.id()) + .is_some() + { + return Err(StableIndexError::DuplicatedChainLength(block.chain_length)); + } + + for (hash, tx) in &block.transactions { + let included_addresses: std::collections::HashSet = tx + .outputs() + .iter() + .map(|output| output.address.clone()) + .chain(tx.inputs().iter().map(|input| input.address.clone())) + .collect(); + + for address in included_addresses { + self.transactions_by_address + .entry(address) + .or_insert(vec![]) + .push(*hash) + } + + if self.transaction_to_block.insert(*hash, block.id).is_some() { + return Err(StableIndexError::TransactionAlreadyExists); + } + } + + self.epochs + .entry(block.date.epoch) + .and_modify(|epoch_data| { + epoch_data.last_block = block.id; + epoch_data.total_blocks += 1; + }) + .or_insert(EpochData { + first_block: block.id, + last_block: block.id, + total_blocks: 1, + }); + + let id = block.id.clone(); + + if self.blocks.insert(block.id, block).is_some() { + return Err(StableIndexError::BlockAlreadyExists(id)); + } + + Ok(()) + } + + pub fn last_block_length(&self) -> Option { + self.block_by_chain_length + .keys() + .last() + .map(ChainLength::clone) + } + + pub fn get_block(&self, block_id: &HeaderHash) -> Option<&ExplorerBlock> { + self.blocks.get(block_id) + } + + pub fn transactions_by_address( + &self, + address: &ExplorerAddress, + ) -> Option> { + self.transactions_by_address + .get(address) + .map(|inner| inner.iter()) + } + + pub fn get_block_by_chain_length(&self, chain_length: &ChainLength) -> Option<&HeaderHash> { + self.block_by_chain_length.get(chain_length) + } + + pub fn get_epoch_data(&self, epoch: &Epoch) -> Option<&EpochData> { + self.epochs.get(epoch) + } + + pub fn transaction_to_block(&self, fragment_id: &FragmentId) -> Option<&HeaderHash> { + self.transaction_to_block.get(fragment_id) + } + + pub fn get_block_hash_range( + &self, + from: ChainLength, + to: ChainLength, + ) -> impl Iterator + '_ { + self.block_by_chain_length + .range(from..to) + .map(|(length, hash)| (hash.clone(), length.clone())) + } +} From 7479f37dead3c8ca5277c846d89152065607ef83 Mon Sep 17 00:00:00 2001 From: Enzo Cioppettini Date: Fri, 9 Apr 2021 17:19:09 -0300 Subject: [PATCH 2/7] explorer PersistentSequence remove_first the important part here is that we only need to remove from the beginning, because this is a blockchain, and we are going to be undoing blocks from the back. --- .../src/explorer/persistent_sequence.rs | 45 ++++++++++++++++++- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/jormungandr/src/explorer/persistent_sequence.rs b/jormungandr/src/explorer/persistent_sequence.rs index 4295b56cbb..9d11a95fa6 100644 --- a/jormungandr/src/explorer/persistent_sequence.rs +++ b/jormungandr/src/explorer/persistent_sequence.rs @@ -1,12 +1,28 @@ use imhamt::Hamt; +use std::convert::Infallible; use std::{collections::hash_map::DefaultHasher, sync::Arc}; -// Use a Hamt to store a sequence, the indexes can be used for pagination -// XXX: Maybe there is a better data structure for this? +/// Use a Hamt to store a sequence, the indexes can be used for pagination + +// TODO: +// this data structure may be better served by either +// a persistent linked list (although pagination will be suffer) +// a persistent btree +// a persistent prefix tree (which would be like the hamt, I think, but without +// hashing the keys before, so we don't lose locality) +// but it is used by different things now, and maybe some would benefit more +// from one of the options than the others + #[derive(Clone)] pub struct PersistentSequence { len: u64, elements: Hamt>, + /// this is the first valid index, as the sequence doesn't need to start from + /// 0, we need this to remove from the beginning, which is useful to undo + /// blocks, because we are always undoing blocks from the back sequentially, + /// in the opposite order they were applied, so we never need to remove from + /// the beginning + first: Option, } impl PersistentSequence { @@ -14,17 +30,42 @@ impl PersistentSequence { PersistentSequence { len: 0, elements: Hamt::new(), + first: None, } } pub fn append(&self, t: T) -> Self { let len = self.len + 1; + let first = self.first.or_else(|| Some(0)).map(|first| first + 1); + PersistentSequence { len, elements: self.elements.insert(len - 1, Arc::new(t)).unwrap(), + first, } } + pub fn remove_first(&self) -> Option<(Self, Arc)> { + self.first.and_then(|first| { + let mut deleted = None; + let elements = self + .elements + .update::<_, Infallible>(&first, |elem| Ok(deleted.replace(Arc::clone(elem)))) + .ok()?; + + deleted.map(|deleted| { + ( + PersistentSequence { + elements, + len: self.len - 1, + first: Some(first + 1), + }, + deleted, + ) + }) + }) + } + pub fn get>(&self, i: I) -> Option<&Arc> { self.elements.lookup(&i.into()) } From 82c03270eadfa93328dfe0f2ae43ee91c32170f5 Mon Sep 17 00:00:00 2001 From: Enzo Cioppettini Date: Fri, 9 Apr 2021 23:37:22 -0300 Subject: [PATCH 3/7] explorer: split db in stable and unstable --- jormungandr/src/explorer/graphql/mod.rs | 38 +- jormungandr/src/explorer/indexing.rs | 321 +++++++++------- jormungandr/src/explorer/mod.rs | 471 ++++++++++++++++++------ 3 files changed, 566 insertions(+), 264 deletions(-) diff --git a/jormungandr/src/explorer/graphql/mod.rs b/jormungandr/src/explorer/graphql/mod.rs index 2a1fac5202..bde91de5a8 100644 --- a/jormungandr/src/explorer/graphql/mod.rs +++ b/jormungandr/src/explorer/graphql/mod.rs @@ -22,6 +22,7 @@ use super::indexing::{ BlockProducer, EpochData, ExplorerAddress, ExplorerBlock, ExplorerTransaction, StakePoolData, }; use super::persistent_sequence::PersistentSequence; +use super::BranchQuery; use crate::blockcfg::{self, FragmentId, HeaderHash}; use crate::explorer::indexing::ExplorerVote; use crate::explorer::{ExplorerDb, Settings as ChainSettings}; @@ -37,7 +38,7 @@ use std::sync::Arc; pub(crate) type RestContext = crate::rest::explorer::EContext; pub struct Branch { - state: super::multiverse::Ref, + state: BranchQuery, id: HeaderHash, } @@ -51,7 +52,7 @@ impl Branch { .ok_or_else(|| ErrorKind::NotFound("branch not found".to_string()).into()) } - fn from_id_and_state(id: HeaderHash, state: super::multiverse::Ref) -> Branch { + fn from_id_and_state(id: HeaderHash, state: BranchQuery) -> Branch { Branch { state, id } } } @@ -63,9 +64,7 @@ impl Branch { } pub async fn block(&self) -> Block { - Block::from_contents(Arc::clone( - self.state.state().blocks.lookup(&self.id).unwrap(), - )) + Block::from_contents(Arc::clone(&self.state.get_block(&self.id).await.unwrap())) } pub async fn blocks( @@ -77,7 +76,7 @@ impl Branch { ) -> FieldResult, EmptyFields>> { let block0 = 0u32; - let chain_length = self.state.state().blocks.size(); + let chain_length = self.state.last_block(); query( after, @@ -87,9 +86,7 @@ impl Branch { |after, before, first, last| async move { let boundaries = PaginationInterval::Inclusive(InclusivePaginationInterval { lower_bound: block0, - // this try_from cannot fail, as there can't be more than 2^32 - // blocks (because ChainLength is u32) - upper_bound: u32::try_from(chain_length).unwrap(), + upper_bound: u32::from(chain_length), }); let pagination_arguments = ValidatedPaginationArguments { @@ -114,7 +111,7 @@ impl Branch { PaginationInterval::Inclusive(range) => { let a = range.lower_bound.into(); let b = range.upper_bound.checked_add(1).unwrap().into(); - self.state.state().get_block_hash_range(a, b) + self.state.get_block_hash_range(a, b).await } }; @@ -148,7 +145,6 @@ impl Branch { let transactions = self .state - .state() .transactions_by_address(&address) .unwrap_or_else(PersistentSequence::::new); @@ -212,7 +208,7 @@ impl Branch { ) -> FieldResult< Connection, EmptyFields>, > { - let mut vote_plans = self.state.state().get_vote_plans(); + let mut vote_plans = self.state.get_vote_plans(); vote_plans.sort_unstable_by_key(|(id, _data)| id.clone()); @@ -290,7 +286,7 @@ impl Branch { before: Option, after: Option, ) -> FieldResult, EmptyFields>> { - let mut stake_pools = self.state.state().get_stake_pools(); + let mut stake_pools = self.state.get_stake_pools(); // Although it's probably not a big performance concern // There are a few alternatives to not have to sort this @@ -395,17 +391,15 @@ impl Branch { |after, before, first, last| async move { let epoch_lower_bound = self .state - .state() - .blocks - .lookup(&epoch_data.first_block) + .get_block(&epoch_data.first_block) + .await .map(|block| u32::from(block.chain_length)) .expect("Epoch lower bound"); let epoch_upper_bound = self .state - .state() - .blocks - .lookup(&epoch_data.last_block) + .get_block(&epoch_data.last_block) + .await .map(|block| u32::from(block.chain_length)) .expect("Epoch upper bound"); @@ -438,11 +432,11 @@ impl Branch { } PaginationInterval::Inclusive(range) => self .state - .state() .get_block_hash_range( (range.lower_bound + epoch_lower_bound).into(), (range.upper_bound + epoch_lower_bound + 1u32).into(), ) + .await .iter() .map(|(hash, index)| (*hash, u32::from(*index) - epoch_lower_bound)) .collect::>(), @@ -1634,7 +1628,7 @@ pub struct Subscription; #[Subscription] impl Subscription { - async fn tip(&self, context: &Context<'_>) -> impl futures::Stream { + async fn tip(&self, context: &Context<'_>) -> impl futures::Stream + '_ { use futures::StreamExt; context .data_unchecked::() @@ -1647,7 +1641,7 @@ impl Subscription { // fine to ignore the error .filter_map(|tip| async move { tip.ok() - .map(|(hash, state)| Branch::from_id_and_state(hash, state)) + .map(|(hash, state)| Branch::from_id_and_state(hash.clone(), state.clone())) }) } } diff --git a/jormungandr/src/explorer/indexing.rs b/jormungandr/src/explorer/indexing.rs index 32c9995b49..2707ead880 100644 --- a/jormungandr/src/explorer/indexing.rs +++ b/jormungandr/src/explorer/indexing.rs @@ -1,4 +1,4 @@ -use super::persistent_sequence::PersistentSequence; +use super::{persistent_sequence::PersistentSequence, stable_storage::StableIndexShared}; use std::collections::hash_map::DefaultHasher; use std::collections::HashMap; @@ -6,7 +6,6 @@ use crate::blockcfg::{Block, BlockDate, ChainLength, Epoch, Fragment, FragmentId use cardano_legacy_address::Addr as OldAddress; use chain_addr::{Address, Discrimination}; use chain_core::property::Block as _; -use chain_core::property::Fragment as _; use chain_impl_mockchain::block::Proof; use chain_impl_mockchain::certificate::{ Certificate, ExternalProposalId, PoolId, PoolRegistration, PoolRetirement, VotePlanId, @@ -17,7 +16,9 @@ use chain_impl_mockchain::value::Value; use chain_impl_mockchain::vote::{ Choice, EncryptedVote, Options, PayloadType, ProofOfCorrectVote, Weight, }; +use futures::stream::{self, StreamExt}; use std::{convert::TryInto, sync::Arc}; +use tracing::trace; pub type Hamt = imhamt::Hamt>; @@ -140,6 +141,7 @@ pub struct ExplorerBlockBuildingContext<'a> { pub discrimination: Discrimination, pub prev_transactions: &'a Transactions, pub prev_blocks: &'a Blocks, + pub stable_storage: StableIndexShared, } impl ExplorerBlock { @@ -148,117 +150,146 @@ impl ExplorerBlock { /// and mapping the account inputs to addresses with the given discrimination /// This function relies on the given block to be validated previously, and will panic /// otherwise - pub fn resolve_from(block: &Block, context: ExplorerBlockBuildingContext) -> ExplorerBlock { + pub async fn resolve_from( + block: &Block, + context: ExplorerBlockBuildingContext<'_>, + ) -> ExplorerBlock { let fragments = block.contents.iter(); let id = block.id(); let chain_length = block.chain_length(); - - let transactions: HashMap = fragments.enumerate().fold( - HashMap::::new(), - |mut current_block_txs, (offset, fragment)| { - let fragment_id = fragment.id(); + let transactions: HashMap = { + let mut current_block_txs = HashMap::new(); + for (offset, fragment) in fragments.enumerate() { + let fragment_id = fragment.hash(); let offset: u32 = offset.try_into().unwrap(); let metx = match fragment { Fragment::Transaction(tx) => { let tx = tx.as_slice(); - Some(ExplorerTransaction::from( - &context, - &fragment_id, - &tx, - None, - offset, - ¤t_block_txs, - )) + Some( + ExplorerTransaction::from( + &context, + fragment_id, + &tx, + None, + offset.clone(), + ¤t_block_txs, + ) + .await, + ) } Fragment::OwnerStakeDelegation(tx) => { let tx = tx.as_slice(); - Some(ExplorerTransaction::from( - &context, - &fragment_id, - &tx, - Some(Certificate::OwnerStakeDelegation( - tx.payload().into_payload(), - )), - offset, - ¤t_block_txs, - )) + Some( + ExplorerTransaction::from( + &context, + fragment_id, + &tx, + Some(Certificate::OwnerStakeDelegation( + tx.payload().into_payload(), + )), + offset.clone(), + ¤t_block_txs, + ) + .await, + ) } Fragment::StakeDelegation(tx) => { let tx = tx.as_slice(); - Some(ExplorerTransaction::from( - &context, - &fragment_id, - &tx, - Some(Certificate::StakeDelegation(tx.payload().into_payload())), - offset, - ¤t_block_txs, - )) + Some( + ExplorerTransaction::from( + &context, + fragment_id, + &tx, + Some(Certificate::StakeDelegation(tx.payload().into_payload())), + offset.clone(), + ¤t_block_txs, + ) + .await, + ) } Fragment::PoolRegistration(tx) => { let tx = tx.as_slice(); - Some(ExplorerTransaction::from( - &context, - &fragment_id, - &tx, - Some(Certificate::PoolRegistration(tx.payload().into_payload())), - offset, - ¤t_block_txs, - )) + Some( + ExplorerTransaction::from( + &context, + fragment_id.clone(), + &tx, + Some(Certificate::PoolRegistration(tx.payload().into_payload())), + offset.clone(), + ¤t_block_txs, + ) + .await, + ) } Fragment::PoolRetirement(tx) => { let tx = tx.as_slice(); - Some(ExplorerTransaction::from( - &context, - &fragment_id, - &tx, - Some(Certificate::PoolRetirement(tx.payload().into_payload())), - offset, - ¤t_block_txs, - )) + Some( + ExplorerTransaction::from( + &context, + fragment_id.clone(), + &tx, + Some(Certificate::PoolRetirement(tx.payload().into_payload())), + offset.clone(), + ¤t_block_txs, + ) + .await, + ) } Fragment::PoolUpdate(tx) => { let tx = tx.as_slice(); - Some(ExplorerTransaction::from( - &context, - &fragment_id, - &tx, - Some(Certificate::PoolUpdate(tx.payload().into_payload())), - offset, - ¤t_block_txs, - )) + Some( + ExplorerTransaction::from( + &context, + fragment_id.clone(), + &tx, + Some(Certificate::PoolUpdate(tx.payload().into_payload())), + offset.clone(), + ¤t_block_txs, + ) + .await, + ) } Fragment::VotePlan(tx) => { let tx = tx.as_slice(); - Some(ExplorerTransaction::from( - &context, - &fragment_id, - &tx, - Some(Certificate::VotePlan(tx.payload().into_payload())), - offset, - ¤t_block_txs, - )) + Some( + ExplorerTransaction::from( + &context, + fragment_id.clone(), + &tx, + Some(Certificate::VotePlan(tx.payload().into_payload())), + offset.clone(), + ¤t_block_txs, + ) + .await, + ) } Fragment::VoteCast(tx) => { let tx = tx.as_slice(); - Some(ExplorerTransaction::from( - &context, - &fragment_id, - &tx, - Some(Certificate::VoteCast(tx.payload().into_payload())), - offset, - ¤t_block_txs, - )) + Some( + ExplorerTransaction::from( + &context, + fragment_id.clone(), + &tx, + Some(Certificate::VoteCast(tx.payload().into_payload())), + offset.clone(), + ¤t_block_txs, + ) + .await, + ) } Fragment::VoteTally(tx) => { let tx = tx.as_slice(); - Some(ExplorerTransaction::from( - &context, - &fragment_id, - &tx, - Some(Certificate::VoteTally(tx.payload().into_payload())), - offset, - ¤t_block_txs, - )) + Some( + ExplorerTransaction::from( + &context, + fragment_id.clone(), + &tx, + Some(Certificate::VoteTally(tx.payload().into_payload())), + offset.clone(), + ¤t_block_txs, + ) + .await, + ) } Fragment::OldUtxoDeclaration(decl) => { let outputs = decl @@ -283,9 +314,10 @@ impl ExplorerBlock { if let Some(etx) = metx { current_block_txs.insert(fragment_id, etx); } - current_block_txs - }, - ); + } + + current_block_txs + }; let producer = match block.header.proof() { Proof::GenesisPraos(_proof) => { @@ -350,9 +382,9 @@ impl ExplorerTransaction { // TODO: The signature of this got too long, using a builder may be a good idea // It's called only from one place, though, so it is not that bothersome - pub fn from<'transaction, 'context, T>( - context: &'context ExplorerBlockBuildingContext<'context>, - id: &FragmentId, + pub async fn from<'transaction, T>( + context: &ExplorerBlockBuildingContext<'transaction>, + id: FragmentId, tx: &TransactionSlice<'transaction, T>, certificate: Option, offset_in_block: u32, @@ -369,61 +401,78 @@ impl ExplorerTransaction { }) .collect(); - let new_inputs = inputs - .map(|i| i.to_enum()) - .zip(witnesses) - .filter_map(|input_with_witness| match input_with_witness { - (InputEnum::AccountInput(id, value), Witness::Account(_)) => { - let kind = chain_addr::Kind::Account( - id.to_single_account() - .expect("the input to be validated") - .into(), - ); - let address = ExplorerAddress::New(Address(context.discrimination, kind)); - Some(ExplorerInput { address, value }) - } - (InputEnum::AccountInput(id, value), Witness::Multisig(_)) => { - let kind = chain_addr::Kind::Multisig( - id.to_multi_account() - .as_ref() - .try_into() - .expect("multisig identifier size doesn't match address kind"), - ); - let address = ExplorerAddress::New(Address(context.discrimination, kind)); - Some(ExplorerInput { address, value }) - } - (InputEnum::UtxoInput(utxo_pointer), _witness) => { - let tx = utxo_pointer.transaction_id; - let index = utxo_pointer.output_index; - - let output = context - .prev_transactions - .lookup(&tx) - .and_then(|block_id| { - context - .prev_blocks - .lookup(&block_id) - .map(|block| &block.transactions[&tx].outputs[index as usize]) - }) - .or_else(|| { - transactions_in_current_block - .get(&tx) - .map(|fragment| &fragment.outputs[index as usize]) - }) + let new_inputs = stream::iter(inputs.map(|i| i.to_enum()).zip(witnesses)) + .filter_map(|input_with_witness| async { + match input_with_witness { + (InputEnum::AccountInput(id, value), Witness::Account(_)) => { + let kind = chain_addr::Kind::Account( + id.to_single_account() + .expect("the input to be validated") + .into(), + ); + let address = ExplorerAddress::New(Address(context.discrimination, kind)); + Some(ExplorerInput { address, value }) + } + (InputEnum::AccountInput(id, value), Witness::Multisig(_)) => { + let kind = chain_addr::Kind::Multisig( + id.to_multi_account() + .as_ref() + .try_into() + .expect("multisig identifier size doesn't match address kind"), + ); + let address = ExplorerAddress::New(Address(context.discrimination, kind)); + Some(ExplorerInput { address, value }) + } + (InputEnum::UtxoInput(utxo_pointer), _witness) => { + let tx = utxo_pointer.transaction_id; + let index = utxo_pointer.output_index; + + trace!("found utxo inupt, processing, {}", &tx); + + let output = match context + .prev_transactions + .lookup(&tx) + .and_then(|block_id| { + context + .prev_blocks + .lookup(&block_id) + .map(|block| &block.transactions[&tx].outputs[index as usize]) + }) + .or_else(|| { + transactions_in_current_block + .get(&tx) + .map(|fragment| &fragment.outputs[index as usize]) + }) { + Some(x) => Some(x.clone()), + None => { + // TODO: maybe this function should return an error + // if any of this things panics, the error is + // most likely a programmer error, at least now + // that the stable storage is in memory, but in + // the future this could be caused by a compromised database + let storage = context.stable_storage.read().await; + let block_id = storage.transaction_to_block(&tx).unwrap(); + let block = storage.get_block(block_id).unwrap(); + let tx = block.transactions.get(&tx).unwrap(); + + Some(tx.outputs[index as usize].clone()) + } + } .expect("transaction not found for utxo input"); - Some(ExplorerInput { - address: output.address.clone(), - value: output.value, - }) + Some(ExplorerInput { + address: output.address.clone(), + value: output.value, + }) + } + _ => None, } - _ => None, }) .collect(); ExplorerTransaction { - id: *id, - inputs: new_inputs, + id, + inputs: new_inputs.await, outputs: new_outputs, certificate, offset_in_block, diff --git a/jormungandr/src/explorer/mod.rs b/jormungandr/src/explorer/mod.rs index 5504b5f9ce..7b9b016a02 100644 --- a/jormungandr/src/explorer/mod.rs +++ b/jormungandr/src/explorer/mod.rs @@ -3,6 +3,7 @@ pub mod graphql; mod indexing; mod multiverse; mod persistent_sequence; +mod stable_storage; use self::error::{ExplorerError as Error, Result}; use self::graphql::EContext; @@ -12,7 +13,7 @@ use self::indexing::{ StakePoolData, Transactions, VotePlans, }; use self::persistent_sequence::PersistentSequence; -use tracing::{span, Level}; +use tracing::{debug, span, Level}; use tracing_futures::Instrument; use crate::blockcfg::{ @@ -30,11 +31,10 @@ use chain_impl_mockchain::certificate::{Certificate, PoolId, VotePlanId}; use chain_impl_mockchain::fee::LinearFee; use futures::prelude::*; use multiverse::Multiverse; +use stable_storage::StableIndexShared; +use std::collections::VecDeque; use std::convert::Infallible; -use std::sync::{ - atomic::{AtomicU32, Ordering}, - Arc, -}; +use std::sync::Arc; use tokio::sync::{broadcast, Mutex, RwLock}; #[derive(Clone)] @@ -58,13 +58,8 @@ pub struct ExplorerDb { pub blockchain_config: BlockchainConfig, blockchain: Blockchain, blockchain_tip: blockchain::Tip, - stable_store: StableIndex, tip_broadcast: tokio::sync::broadcast::Sender<(HeaderHash, multiverse::Ref)>, -} - -#[derive(Clone)] -pub struct StableIndex { - confirmed_block_chain_length: Arc, + stable_storage: StableIndexShared, } #[derive(Clone)] @@ -133,7 +128,7 @@ impl Explorer { let mut guard = tip_candidate.lock().await; if guard.map(|hash| hash == block.header.id()).unwrap_or(false) { let hash = guard.take().unwrap(); - explorer_db.set_tip(hash).await; + explorer_db.set_tip(hash).await?; } Ok(()) @@ -149,7 +144,7 @@ impl Explorer { info.spawn_fallible::<_, Error>( "apply tip to explorer", async move { - let successful = explorer_db.set_tip(hash).await; + let successful = explorer_db.set_tip(hash).await?; if !successful { let mut guard = tip_candidate.lock().await; @@ -194,14 +189,18 @@ impl ExplorerDb { .expect("the Initial fragment to be present in the genesis block"), ); + let stable_storage = StableIndexShared::default(); + let block = ExplorerBlock::resolve_from( &block0, indexing::ExplorerBlockBuildingContext { discrimination: blockchain_config.discrimination, prev_transactions: &Transactions::new(), prev_blocks: &Blocks::new(), + stable_storage: stable_storage.clone(), }, - ); + ) + .await; let blocks = apply_block_to_blocks(Blocks::new(), &block)?; let epochs = apply_block_to_epochs(Epochs::new(), &block); @@ -246,16 +245,19 @@ impl ExplorerDb { blockchain_config, blockchain: blockchain.clone(), blockchain_tip, - stable_store: StableIndex { - confirmed_block_chain_length: Arc::new(AtomicU32::default()), - }, + stable_storage, tip_broadcast: tx, }; let db = stream .map_err(Error::from) .try_fold(bootstraped_db, |db, block| async move { + let block_id = block.id(); db.apply_block(block).await?; + // TODO: this only works because the StableIndex is in memory + // otherwise, this would try to apply blocks there... + // there are multiple solutions, but can change later + db.set_tip(block_id).await?; Ok(db) }) .await?; @@ -287,6 +289,13 @@ impl ExplorerDb { /// This doesn't perform any validation on the given block and the previous state, it /// is assumed that the Block is valid async fn apply_block(&self, block: Block) -> Result { + debug!( + id=%block.id(), + chain_length=%block.chain_length(), + parent=%block.header.block_parent_hash(), + "applying block to explorer's in-memory storage", + ); + let previous_block = block.header.block_parent_hash(); let chain_length = block.header.chain_length(); let block_id = block.header.hash(); @@ -314,22 +323,38 @@ impl ExplorerDb { discrimination, prev_transactions: &transactions, prev_blocks: &blocks, + stable_storage: self.stable_storage.clone(), }, - ); + ) + .await; + let (stake_pool_data, stake_pool_blocks) = apply_block_to_stake_pools(stake_pool_data, stake_pool_blocks, &explorer_block); - let state_ref = multiverse - .insert( - chain_length, - block.parent_id(), - block_id, - State { - transactions: apply_block_to_transactions(transactions, &explorer_block)?, - blocks: apply_block_to_blocks(blocks, &explorer_block)?, - addresses: apply_block_to_addresses(addresses, &explorer_block), - epochs: apply_block_to_epochs(epochs, &explorer_block), - chain_lengths: apply_block_to_chain_lengths(chain_lengths, &explorer_block)?, + let mut blocks = apply_block_to_blocks(blocks, &explorer_block)?; + let mut addresses = apply_block_to_addresses(addresses, &explorer_block); + let mut transactions = apply_block_to_transactions(transactions, &explorer_block)?; + let mut chain_lengths = apply_block_to_chain_lengths(chain_lengths, &explorer_block)?; + let mut epochs = apply_block_to_epochs(epochs, &explorer_block); + + let process_state = + |blocks_to_invert: Option>>| -> Result { + for block_to_invert in blocks_to_invert.iter().flatten() { + blocks = unapply_block_to_blocks(blocks, block_to_invert.as_ref())?; + addresses = unapply_block_to_addresses(addresses, block_to_invert.as_ref()); + transactions = + unapply_block_to_transactions(transactions, block_to_invert.as_ref())?; + chain_lengths = + unapply_block_to_chain_lengths(chain_lengths, block_to_invert.as_ref())?; + epochs = unapply_block_to_epochs(epochs, block_to_invert.as_ref()); + } + + Ok(State { + transactions, + blocks, + addresses, + epochs, + chain_lengths, stake_pool_data, stake_pool_blocks, vote_plans: apply_block_to_vote_plans( @@ -337,52 +362,65 @@ impl ExplorerDb { &self.blockchain_tip, &explorer_block, ), - }, - ) + }) + }; + + let state_ref = multiverse + .insert(chain_length, block.parent_id(), block_id, process_state) .await; - Ok(state_ref) + state_ref } pub async fn get_block(&self, block_id: &HeaderHash) -> Option> { - for (_hash, state_ref) in self.multiverse.tips().await.iter() { + for (_, _hash, state_ref) in self.multiverse.tips().await.iter() { if let Some(b) = state_ref.state().blocks.lookup(&block_id) { return Some(Arc::clone(b)); } } - None + self.stable_storage + .read() + .await + .get_block(block_id) + .map(|block_ref| Arc::new(block_ref.clone())) } - pub(self) async fn set_tip(&self, hash: HeaderHash) -> bool { + pub(self) async fn set_tip(&self, hash: HeaderHash) -> Result { // the tip changes which means now a block is confirmed (at least after // the initial epoch_stability_depth blocks). - let state_ref = if let Some(state_ref) = self.multiverse.get_ref(&hash).await { state_ref } else { - return false; + return Ok(false); }; - let state = state_ref.state(); - let block = Arc::clone(state.blocks.lookup(&hash).unwrap()); + let block = { + let state = state_ref.state(); + Arc::clone(state.blocks.lookup(&hash).unwrap()) + }; if let Some(confirmed_block_chain_length) = block .chain_length() .nth_ancestor(self.blockchain_config.epoch_stability_depth) { - debug_assert!( - ChainLength::from( - self.stable_store - .confirmed_block_chain_length - .load(Ordering::Acquire) - ) <= block.chain_length() - ); - - self.stable_store - .confirmed_block_chain_length - .store(confirmed_block_chain_length.into(), Ordering::Release); + let hash = state_ref + .state() + .chain_lengths + .lookup(&confirmed_block_chain_length) + .unwrap(); + let stable_block = Arc::clone(state_ref.state().blocks.lookup(&hash).unwrap()); + + self.stable_storage + .write() + .await + .apply_block((*stable_block).clone())?; + + self.multiverse.confirm_block(stable_block).await; + + // TODO: actually, maybe running gc with every tip change is not ideal? + // maybe it's better to run it every X time or after N blocks self.multiverse .gc(self.blockchain_config.epoch_stability_depth) .await; @@ -390,73 +428,113 @@ impl ExplorerDb { let mut guard = self.longest_chain_tip.0.write().await; + debug!("setting explorer tip to: {}", hash); + *guard = hash; let _ = self.tip_broadcast.send((hash, state_ref)); - true + Ok(true) } pub(self) async fn get_block_with_branches( &self, block_id: &HeaderHash, - ) -> Option<(Arc, Vec<(HeaderHash, multiverse::Ref)>)> { + ) -> Option<(Arc, Vec<(HeaderHash, BranchQuery)>)> { let mut block = None; let mut tips = Vec::new(); - for (hash, state_ref) in self.multiverse.tips().await.drain(..) { + for (last_block, hash, state_ref) in self.multiverse.tips().await.drain(..) { if let Some(b) = state_ref.state().blocks.lookup(&block_id) { block = block.or_else(|| Some(Arc::clone(b))); - tips.push((hash, state_ref)); + tips.push(( + hash, + BranchQuery { + state_ref, + stable_storage: self.stable_storage.clone(), + last_block, + }, + )); } } - block.map(|b| (b, tips)) + if block.is_some() { + block.map(|b| (b, tips)) + } else { + if let Some(block) = self.stable_storage.read().await.get_block(block_id) { + // a confirmed block is technically in all branches + // TODO: maybe it's better to have an enum for the result here + Some(( + Arc::new(block.clone()), + self.multiverse + .tips() + .await + .drain(..) + .map(|(last_block, hash, state_ref)| { + ( + hash, + BranchQuery { + state_ref, + stable_storage: self.stable_storage.clone(), + last_block, + }, + ) + }) + .collect(), + )) + } else { + None + } + } } pub async fn get_epoch(&self, epoch: Epoch) -> Option { let tips = self.multiverse.tips().await; - let (_, state_ref) = &tips[0]; + let (_, _, state_ref) = &tips[0]; - state_ref + let from_multiverse = state_ref .state() .epochs .lookup(&epoch) - .map(|e| e.as_ref().clone()) - } + .map(|e| e.as_ref().clone()); - pub async fn is_block_confirmed(&self, block_id: &HeaderHash) -> bool { - let current_branch = self - .multiverse - .get_ref(&self.longest_chain_tip.get_block_id().await) - .await - .unwrap(); - - if let Some(block) = current_branch.state().blocks.lookup(&block_id) { - let confirmed_block_chain_length: ChainLength = self - .stable_store - .confirmed_block_chain_length - .load(Ordering::Acquire) - .into(); - block.chain_length <= confirmed_block_chain_length + if from_multiverse.is_some() { + from_multiverse } else { - false + self.stable_storage + .read() + .await + .get_epoch_data(&epoch) + .map(|data| data.clone()) } } + pub async fn is_block_confirmed(&self, block_id: &HeaderHash) -> bool { + self.stable_storage.read().await.get_block(block_id).is_some() + } + pub async fn find_blocks_by_chain_length(&self, chain_length: ChainLength) -> Vec { let mut hashes = Vec::new(); - for (_hash, state_ref) in self.multiverse.tips().await.iter() { + for (_, _hash, state_ref) in self.multiverse.tips().await.iter() { if let Some(hash) = state_ref.state().chain_lengths.lookup(&chain_length) { hashes.push(**hash); } } - hashes.sort_unstable(); - hashes.dedup(); + if hashes.is_empty() { + self.stable_storage + .read() + .await + .get_block_by_chain_length(&chain_length) + .map(|hash| vec![*hash]) + .unwrap_or_default() + } else { + hashes.sort_unstable(); + hashes.dedup(); - hashes + hashes + } } pub async fn find_blocks_by_transaction(&self, transaction_id: &FragmentId) -> Vec { @@ -465,7 +543,7 @@ impl ExplorerDb { .tips() .await .iter() - .filter_map(|(_tip_hash, state_ref)| { + .filter_map(|(_, _tip_hash, state_ref)| { state_ref .state() .transactions @@ -474,10 +552,19 @@ impl ExplorerDb { }) .collect(); - txs.sort_unstable(); - txs.dedup(); + if txs.is_empty() { + self.stable_storage + .read() + .await + .transaction_to_block(transaction_id) + .map(|id| vec![*id]) + .unwrap_or_default() + } else { + txs.sort_unstable(); + txs.dedup(); - txs + txs + } } pub async fn get_stake_pool_blocks( @@ -498,7 +585,7 @@ impl ExplorerDb { .tips() .await .iter() - .filter_map(|(_hash, state_ref)| state_ref.state().stake_pool_blocks.lookup(&pool)) + .filter_map(|(_, _hash, state_ref)| state_ref.state().stake_pool_blocks.lookup(&pool)) .max_by_key(|seq| seq.len()) .map(Arc::clone) } @@ -506,7 +593,7 @@ impl ExplorerDb { pub async fn get_stake_pool_data(&self, pool: &PoolId) -> Option> { let pool = pool.clone(); - for (_hash, state_ref) in self.multiverse.tips().await.iter() { + for (_, _hash, state_ref) in self.multiverse.tips().await.iter() { if let Some(b) = state_ref.state().stake_pool_data.lookup(&pool) { return Some(Arc::clone(b)); } @@ -519,7 +606,7 @@ impl ExplorerDb { &self, vote_plan_id: &VotePlanId, ) -> Option> { - for (_hash, state_ref) in self.multiverse.tips().await.iter() { + for (_, _hash, state_ref) in self.multiverse.tips().await.iter() { if let Some(b) = state_ref.state().vote_plans.lookup(&vote_plan_id) { return Some(Arc::clone(b)); } @@ -528,17 +615,47 @@ impl ExplorerDb { None } - pub(self) async fn get_branch(&self, hash: &HeaderHash) -> Option { - self.multiverse.get_ref(hash).await + pub(self) async fn get_branch(&self, hash: &HeaderHash) -> Option { + let state_ref = self.multiverse.get_ref(hash).await?; + let last_block = state_ref.state().blocks.lookup(hash).unwrap().chain_length; + + Some(BranchQuery { + state_ref, + stable_storage: self.stable_storage.clone(), + last_block, + }) } - pub(self) async fn get_tip(&self) -> (HeaderHash, multiverse::Ref) { + pub(self) async fn get_tip(&self) -> (HeaderHash, BranchQuery) { let hash = self.longest_chain_tip.get_block_id().await; - (hash, self.multiverse.get_ref(&hash).await.unwrap()) + let state_ref = self.multiverse.get_ref(&hash).await.unwrap(); + let last_block = state_ref.state().blocks.lookup(&hash).unwrap().chain_length; + ( + hash, + BranchQuery { + state_ref: state_ref, + stable_storage: self.stable_storage.clone(), + last_block, + }, + ) } - pub(self) async fn get_branches(&self) -> Vec<(HeaderHash, multiverse::Ref)> { - self.multiverse.tips().await + pub(self) async fn get_branches(&self) -> Vec<(HeaderHash, BranchQuery)> { + self.multiverse + .tips() + .await + .iter() + .map(|(last_block, hash, state_ref)| { + ( + *hash, + BranchQuery { + state_ref: state_ref.clone(), + stable_storage: self.stable_storage.clone(), + last_block: *last_block, + }, + ) + }) + .collect() } fn blockchain(&self) -> &Blockchain { @@ -549,11 +666,26 @@ impl ExplorerDb { &self, ) -> impl Stream< Item = std::result::Result< - (HeaderHash, multiverse::Ref), + (HeaderHash, BranchQuery), tokio_stream::wrappers::errors::BroadcastStreamRecvError, >, > { - tokio_stream::wrappers::BroadcastStream::new(self.tip_broadcast.subscribe()) + let stable_store = self.stable_storage.clone(); + tokio_stream::wrappers::BroadcastStream::new(self.tip_broadcast.subscribe()).map( + move |item| { + item.map(|(hash, state_ref)| { + let last_block = state_ref.state().blocks.lookup(&hash).unwrap().chain_length; + ( + hash, + BranchQuery { + state_ref, + stable_storage: stable_store.clone(), + last_block, + }, + ) + }) + }, + ) } } @@ -573,6 +705,21 @@ fn apply_block_to_transactions( Ok(transactions) } +fn unapply_block_to_transactions( + mut transactions: Transactions, + block: &ExplorerBlock, +) -> Result { + let ids = block.transactions.values().map(|tx| tx.id()); + + for id in ids { + transactions = transactions + .remove(&id) + .map_err(|_| Error::TransactionNotFound(id))?; + } + + Ok(transactions) +} + fn apply_block_to_blocks(blocks: Blocks, block: &ExplorerBlock) -> Result { let block_id = block.id(); blocks @@ -580,6 +727,13 @@ fn apply_block_to_blocks(blocks: Blocks, block: &ExplorerBlock) -> Result Result { + let block_id = block.id(); + blocks + .remove(&block_id) + .map_err(|_| Error::BlockNotFound(block_id)) +} + fn apply_block_to_addresses(mut addresses: Addresses, block: &ExplorerBlock) -> Addresses { let transactions = block.transactions.values(); @@ -610,6 +764,36 @@ fn apply_block_to_addresses(mut addresses: Addresses, block: &ExplorerBlock) -> addresses } +fn unapply_block_to_addresses(mut addresses: Addresses, block: &ExplorerBlock) -> Addresses { + let transactions = block.transactions.values(); + + for tx in transactions { + let id = tx.id(); + + // A Hashset is used for preventing duplicates when the address is both an + // input and an output in the given transaction + + let included_addresses: std::collections::HashSet = tx + .outputs() + .iter() + .map(|output| output.address.clone()) + .chain(tx.inputs().iter().map(|input| input.address.clone())) + .collect(); + + for address in included_addresses { + addresses = addresses + .update::<_, Infallible>(&address, |set| { + Ok(set.remove_first().map(|(seq, removed)| { + assert_eq!(*removed, id); + Arc::new(seq) + })) + }) + .unwrap() + } + } + addresses +} + fn apply_block_to_epochs(epochs: Epochs, block: &ExplorerBlock) -> Epochs { let epoch_id = block.date().epoch; let block_id = block.id(); @@ -631,6 +815,19 @@ fn apply_block_to_epochs(epochs: Epochs, block: &ExplorerBlock) -> Epochs { ) } +fn unapply_block_to_epochs(epochs: Epochs, block: &ExplorerBlock) -> Epochs { + let epoch_id = block.date().epoch; + let block_id = block.id(); + + let epoch_data = epochs.lookup(&epoch_id).unwrap(); + + if epoch_data.last_block == block_id { + epochs.remove(&epoch_id).unwrap() + } else { + epochs + } +} + fn apply_block_to_chain_lengths( chain_lengths: ChainLengths, block: &ExplorerBlock, @@ -645,6 +842,16 @@ fn apply_block_to_chain_lengths( }) } +fn unapply_block_to_chain_lengths( + chain_lengths: ChainLengths, + block: &ExplorerBlock, +) -> Result { + let new_block_chain_length = block.chain_length(); + chain_lengths + .remove(&new_block_chain_length) + .map_err(|_| Error::BlockNotFound(block.id())) +} + fn apply_block_to_stake_pools( data: StakePool, blocks: StakePoolBlocks, @@ -902,16 +1109,51 @@ impl Tip { } } -impl State { +/// wrapper used to contextualize queries within a particular branch +/// this tries to search first in memory (the state_ref), in case that fails, it +/// tries with the stable index (but it is not a cache, because the two datasets +/// are disjoint) +#[derive(Clone)] +pub struct BranchQuery { + state_ref: multiverse::Ref, + stable_storage: StableIndexShared, + // TODO: this could be embedded/cached in the state, it's a + // performance/memory tradeoff, analyze later + last_block: ChainLength, +} + +impl BranchQuery { + pub async fn get_block(&self, block_id: &HeaderHash) -> Option> { + self.state_ref + .state() + .blocks + .lookup(&block_id) + .cloned() + .or(self + .stable_storage + .read() + .await + .get_block(block_id) + .map(|block| Arc::new(block.clone()))) + } + + pub fn last_block(&self) -> ChainLength { + self.last_block + } + pub fn get_vote_plans(&self) -> Vec<(VotePlanId, Arc)> { - self.vote_plans + self.state_ref + .state() + .vote_plans .iter() .map(|(k, v)| (k.clone(), v.clone())) .collect() } pub fn get_stake_pools(&self) -> Vec<(PoolId, Arc)> { - self.stake_pool_data + self.state_ref + .state() + .stake_pool_data .iter() .map(|(k, v)| (k.clone(), v.clone())) .collect() @@ -921,28 +1163,45 @@ impl State { &self, address: &ExplorerAddress, ) -> Option> { - self.addresses + self.state_ref + .state() + .addresses .lookup(address) .map(|txs| PersistentSequence::clone(txs)) } - // Get the hashes of all blocks in the range [from, to) - // the ChainLength is returned to for easy of use in the case where - // `to` is greater than the max - pub fn get_block_hash_range( + /// Get the hashes of all blocks in the range [from, to) + /// the ChainLength is returned to for easy of use in the case where + /// `to` is greater than the max + pub async fn get_block_hash_range( &self, from: ChainLength, to: ChainLength, ) -> Vec<(HeaderHash, ChainLength)> { - let from = u32::from(from); - let to = u32::from(to); + let a = u32::from(from); + let b = u32::from(to); - (from..to) + let mut unstable: Vec<_> = (a..b) .filter_map(|i| { - self.chain_lengths + self.state_ref + .state() + .chain_lengths .lookup(&i.into()) .map(|b| (*b.as_ref(), i.into())) }) - .collect() + .collect(); + + let stable_upper_bound = unstable.get(0).map(|(_, l)| *l).unwrap_or(to); + let missing_in_unstable = stable_upper_bound != from; + + if missing_in_unstable { + let stable_store = self.stable_storage.read().await; + + let blocks = stable_store.get_block_hash_range(from, stable_upper_bound); + + blocks.chain(unstable.drain(..)).collect() + } else { + unstable + } } } From 8fc05382d0e21f894a76e8372691e19034bc6c19 Mon Sep 17 00:00:00 2001 From: Enzo Cioppettini Date: Fri, 9 Apr 2021 19:17:05 -0300 Subject: [PATCH 4/7] explorer: add ChainLength to State --- jormungandr/src/explorer/mod.rs | 97 ++++++++++++++++++--------------- 1 file changed, 52 insertions(+), 45 deletions(-) diff --git a/jormungandr/src/explorer/mod.rs b/jormungandr/src/explorer/mod.rs index 7b9b016a02..87adc8e209 100644 --- a/jormungandr/src/explorer/mod.rs +++ b/jormungandr/src/explorer/mod.rs @@ -32,7 +32,6 @@ use chain_impl_mockchain::fee::LinearFee; use futures::prelude::*; use multiverse::Multiverse; use stable_storage::StableIndexShared; -use std::collections::VecDeque; use std::convert::Infallible; use std::sync::Arc; use tokio::sync::{broadcast, Mutex, RwLock}; @@ -85,6 +84,7 @@ pub(self) struct State { stake_pool_data: StakePool, stake_pool_blocks: StakePoolBlocks, vote_plans: VotePlans, + chain_length: ChainLength, } #[derive(Clone)] @@ -220,6 +220,7 @@ impl ExplorerDb { stake_pool_data, stake_pool_blocks, vote_plans, + chain_length: block0.chain_length(), }; let block0_id = block0.id(); @@ -306,6 +307,7 @@ impl ExplorerDb { .get_ref(&previous_block) .await .ok_or_else(|| Error::AncestorNotFound(block.id()))?; + let State { transactions, blocks, @@ -315,6 +317,7 @@ impl ExplorerDb { stake_pool_data, stake_pool_blocks, vote_plans, + .. } = previous_state.state().clone(); let explorer_block = ExplorerBlock::resolve_from( @@ -337,19 +340,35 @@ impl ExplorerDb { let mut chain_lengths = apply_block_to_chain_lengths(chain_lengths, &explorer_block)?; let mut epochs = apply_block_to_epochs(epochs, &explorer_block); - let process_state = - |blocks_to_invert: Option>>| -> Result { - for block_to_invert in blocks_to_invert.iter().flatten() { - blocks = unapply_block_to_blocks(blocks, block_to_invert.as_ref())?; - addresses = unapply_block_to_addresses(addresses, block_to_invert.as_ref()); - transactions = - unapply_block_to_transactions(transactions, block_to_invert.as_ref())?; - chain_lengths = - unapply_block_to_chain_lengths(chain_lengths, block_to_invert.as_ref())?; - epochs = unapply_block_to_epochs(epochs, block_to_invert.as_ref()); - } + if let Some(confirmed_block_chain_length) = block + .chain_length() + .nth_ancestor(self.blockchain_config.epoch_stability_depth) + { + let block_to_undo = Arc::clone( + chain_lengths + .lookup(&confirmed_block_chain_length) + .and_then(|hash| blocks.lookup(&hash)) + .unwrap(), + ); + + blocks = unapply_block_to_blocks(blocks, block_to_undo.as_ref())?; + addresses = unapply_block_to_addresses(addresses, block_to_undo.as_ref()); + transactions = unapply_block_to_transactions(transactions, block_to_undo.as_ref())?; + chain_lengths = unapply_block_to_chain_lengths(chain_lengths, block_to_undo.as_ref())?; + epochs = unapply_block_to_epochs(epochs, block_to_undo.as_ref()); + + // IN THEORY we need to be sure here that the block that we undid is + // indexed in the stable index. Otherwise, a query may miss + // something if it comes in the middle and uses the last state + // IN PRACTICE it's really unlikely with the current implementation + }; - Ok(State { + let state_ref = multiverse + .insert( + chain_length, + block.parent_id(), + block_id, + State { transactions, blocks, addresses, @@ -362,18 +381,16 @@ impl ExplorerDb { &self.blockchain_tip, &explorer_block, ), - }) - }; - - let state_ref = multiverse - .insert(chain_length, block.parent_id(), block_id, process_state) + chain_length, + }, + ) .await; - state_ref + Ok(state_ref) } pub async fn get_block(&self, block_id: &HeaderHash) -> Option> { - for (_, _hash, state_ref) in self.multiverse.tips().await.iter() { + for (_hash, state_ref) in self.multiverse.tips().await.iter() { if let Some(b) = state_ref.state().blocks.lookup(&block_id) { return Some(Arc::clone(b)); } @@ -417,8 +434,6 @@ impl ExplorerDb { .await .apply_block((*stable_block).clone())?; - self.multiverse.confirm_block(stable_block).await; - // TODO: actually, maybe running gc with every tip change is not ideal? // maybe it's better to run it every X time or after N blocks self.multiverse @@ -444,7 +459,7 @@ impl ExplorerDb { let mut block = None; let mut tips = Vec::new(); - for (last_block, hash, state_ref) in self.multiverse.tips().await.drain(..) { + for (hash, state_ref) in self.multiverse.tips().await.drain(..) { if let Some(b) = state_ref.state().blocks.lookup(&block_id) { block = block.or_else(|| Some(Arc::clone(b))); tips.push(( @@ -452,7 +467,6 @@ impl ExplorerDb { BranchQuery { state_ref, stable_storage: self.stable_storage.clone(), - last_block, }, )); } @@ -470,13 +484,12 @@ impl ExplorerDb { .tips() .await .drain(..) - .map(|(last_block, hash, state_ref)| { + .map(|(hash, state_ref)| { ( hash, BranchQuery { state_ref, stable_storage: self.stable_storage.clone(), - last_block, }, ) }) @@ -490,7 +503,7 @@ impl ExplorerDb { pub async fn get_epoch(&self, epoch: Epoch) -> Option { let tips = self.multiverse.tips().await; - let (_, _, state_ref) = &tips[0]; + let (_, state_ref) = &tips[0]; let from_multiverse = state_ref .state() @@ -510,13 +523,17 @@ impl ExplorerDb { } pub async fn is_block_confirmed(&self, block_id: &HeaderHash) -> bool { - self.stable_storage.read().await.get_block(block_id).is_some() + self.stable_storage + .read() + .await + .get_block(block_id) + .is_some() } pub async fn find_blocks_by_chain_length(&self, chain_length: ChainLength) -> Vec { let mut hashes = Vec::new(); - for (_, _hash, state_ref) in self.multiverse.tips().await.iter() { + for (_hash, state_ref) in self.multiverse.tips().await.iter() { if let Some(hash) = state_ref.state().chain_lengths.lookup(&chain_length) { hashes.push(**hash); } @@ -543,7 +560,7 @@ impl ExplorerDb { .tips() .await .iter() - .filter_map(|(_, _tip_hash, state_ref)| { + .filter_map(|(_tip_hash, state_ref)| { state_ref .state() .transactions @@ -585,7 +602,7 @@ impl ExplorerDb { .tips() .await .iter() - .filter_map(|(_, _hash, state_ref)| state_ref.state().stake_pool_blocks.lookup(&pool)) + .filter_map(|(_hash, state_ref)| state_ref.state().stake_pool_blocks.lookup(&pool)) .max_by_key(|seq| seq.len()) .map(Arc::clone) } @@ -593,7 +610,7 @@ impl ExplorerDb { pub async fn get_stake_pool_data(&self, pool: &PoolId) -> Option> { let pool = pool.clone(); - for (_, _hash, state_ref) in self.multiverse.tips().await.iter() { + for (_hash, state_ref) in self.multiverse.tips().await.iter() { if let Some(b) = state_ref.state().stake_pool_data.lookup(&pool) { return Some(Arc::clone(b)); } @@ -606,7 +623,7 @@ impl ExplorerDb { &self, vote_plan_id: &VotePlanId, ) -> Option> { - for (_, _hash, state_ref) in self.multiverse.tips().await.iter() { + for (_hash, state_ref) in self.multiverse.tips().await.iter() { if let Some(b) = state_ref.state().vote_plans.lookup(&vote_plan_id) { return Some(Arc::clone(b)); } @@ -617,25 +634,21 @@ impl ExplorerDb { pub(self) async fn get_branch(&self, hash: &HeaderHash) -> Option { let state_ref = self.multiverse.get_ref(hash).await?; - let last_block = state_ref.state().blocks.lookup(hash).unwrap().chain_length; Some(BranchQuery { state_ref, stable_storage: self.stable_storage.clone(), - last_block, }) } pub(self) async fn get_tip(&self) -> (HeaderHash, BranchQuery) { let hash = self.longest_chain_tip.get_block_id().await; let state_ref = self.multiverse.get_ref(&hash).await.unwrap(); - let last_block = state_ref.state().blocks.lookup(&hash).unwrap().chain_length; ( hash, BranchQuery { state_ref: state_ref, stable_storage: self.stable_storage.clone(), - last_block, }, ) } @@ -645,13 +658,12 @@ impl ExplorerDb { .tips() .await .iter() - .map(|(last_block, hash, state_ref)| { + .map(|(hash, state_ref)| { ( *hash, BranchQuery { state_ref: state_ref.clone(), stable_storage: self.stable_storage.clone(), - last_block: *last_block, }, ) }) @@ -674,13 +686,11 @@ impl ExplorerDb { tokio_stream::wrappers::BroadcastStream::new(self.tip_broadcast.subscribe()).map( move |item| { item.map(|(hash, state_ref)| { - let last_block = state_ref.state().blocks.lookup(&hash).unwrap().chain_length; ( hash, BranchQuery { state_ref, stable_storage: stable_store.clone(), - last_block, }, ) }) @@ -1117,9 +1127,6 @@ impl Tip { pub struct BranchQuery { state_ref: multiverse::Ref, stable_storage: StableIndexShared, - // TODO: this could be embedded/cached in the state, it's a - // performance/memory tradeoff, analyze later - last_block: ChainLength, } impl BranchQuery { @@ -1138,7 +1145,7 @@ impl BranchQuery { } pub fn last_block(&self) -> ChainLength { - self.last_block + self.state_ref.state().chain_length } pub fn get_vote_plans(&self) -> Vec<(VotePlanId, Arc)> { From 044cfd08b5ccdc35e61dcbcd7a0b19ee0367a7a8 Mon Sep 17 00:00:00 2001 From: Enzo Cioppettini Date: Fri, 9 Apr 2021 20:11:45 -0300 Subject: [PATCH 5/7] move block to stable in apply_block doing it in set_tip doesn't work, because we do that *after* applying the block, and then we won't have the stable_block in the tip's branch also, prevents some concurrency issues --- jormungandr/src/explorer/mod.rs | 55 ++++++++++++--------------------- 1 file changed, 20 insertions(+), 35 deletions(-) diff --git a/jormungandr/src/explorer/mod.rs b/jormungandr/src/explorer/mod.rs index 87adc8e209..bd0111fecc 100644 --- a/jormungandr/src/explorer/mod.rs +++ b/jormungandr/src/explorer/mod.rs @@ -294,7 +294,7 @@ impl ExplorerDb { id=%block.id(), chain_length=%block.chain_length(), parent=%block.header.block_parent_hash(), - "applying block to explorer's in-memory storage", + "applying block to in-memory storage", ); let previous_block = block.header.block_parent_hash(); @@ -344,6 +344,11 @@ impl ExplorerDb { .chain_length() .nth_ancestor(self.blockchain_config.epoch_stability_depth) { + debug!( + "undoing block with chain_length {}", + confirmed_block_chain_length + ); + let block_to_undo = Arc::clone( chain_lengths .lookup(&confirmed_block_chain_length) @@ -351,16 +356,19 @@ impl ExplorerDb { .unwrap(), ); + // ignore the error because right now it can only fail if the + // block is already there + let _ = self + .stable_storage + .write() + .await + .apply_block((*block_to_undo).clone())?; + blocks = unapply_block_to_blocks(blocks, block_to_undo.as_ref())?; addresses = unapply_block_to_addresses(addresses, block_to_undo.as_ref()); transactions = unapply_block_to_transactions(transactions, block_to_undo.as_ref())?; chain_lengths = unapply_block_to_chain_lengths(chain_lengths, block_to_undo.as_ref())?; epochs = unapply_block_to_epochs(epochs, block_to_undo.as_ref()); - - // IN THEORY we need to be sure here that the block that we undid is - // indexed in the stable index. Otherwise, a query may miss - // something if it comes in the middle and uses the last state - // IN PRACTICE it's really unlikely with the current implementation }; let state_ref = multiverse @@ -412,38 +420,15 @@ impl ExplorerDb { return Ok(false); }; - let block = { - let state = state_ref.state(); - Arc::clone(state.blocks.lookup(&hash).unwrap()) - }; - - if let Some(confirmed_block_chain_length) = block - .chain_length() - .nth_ancestor(self.blockchain_config.epoch_stability_depth) - { - let hash = state_ref - .state() - .chain_lengths - .lookup(&confirmed_block_chain_length) - .unwrap(); - - let stable_block = Arc::clone(state_ref.state().blocks.lookup(&hash).unwrap()); - - self.stable_storage - .write() - .await - .apply_block((*stable_block).clone())?; - - // TODO: actually, maybe running gc with every tip change is not ideal? - // maybe it's better to run it every X time or after N blocks - self.multiverse - .gc(self.blockchain_config.epoch_stability_depth) - .await; - } + // TODO: actually, maybe running gc with every tip change is not ideal? + // maybe it's better to run it every X time or after N blocks + self.multiverse + .gc(self.blockchain_config.epoch_stability_depth) + .await; let mut guard = self.longest_chain_tip.0.write().await; - debug!("setting explorer tip to: {}", hash); + debug!("setting tip to: {}", hash); *guard = hash; From aba9a9f2f85ae4d16249a6ea223956c403667f5b Mon Sep 17 00:00:00 2001 From: Enzo Cioppettini Date: Fri, 9 Apr 2021 17:24:29 -0300 Subject: [PATCH 6/7] explorer: add more extensive sanity test mostly to account for the fact that at some point data won't be entirely in memory, so we need to take epoch_stability_depth and utxo inputs into account --- .../src/jormungandr/explorer/mod.rs | 131 +++++++++++++++++- .../src/testing/node/time.rs | 22 +++ .../src/wallet/mod.rs | 21 ++- .../src/wallet/utxo.rs | 26 +++- 4 files changed, 193 insertions(+), 7 deletions(-) diff --git a/testing/jormungandr-integration-tests/src/jormungandr/explorer/mod.rs b/testing/jormungandr-integration-tests/src/jormungandr/explorer/mod.rs index 20c96a9ad7..2abfec62ea 100644 --- a/testing/jormungandr-integration-tests/src/jormungandr/explorer/mod.rs +++ b/testing/jormungandr-integration-tests/src/jormungandr/explorer/mod.rs @@ -1,11 +1,12 @@ use crate::common::{ jcli::JCli, jormungandr::ConfigurationBuilder, startup, transaction_utils::TransactionHash, }; -use chain_impl_mockchain::fragment::FragmentId; use chain_impl_mockchain::key::Hash; +use chain_impl_mockchain::{block::ChainLength, fragment::Fragment}; +use chain_impl_mockchain::{fragment::FragmentId, transaction::UtxoPointer}; use jormungandr_lib::interfaces::ActiveSlotCoefficient; -use jormungandr_testing_utils::stake_pool::StakePool; use jormungandr_testing_utils::testing::node::Explorer; +use jormungandr_testing_utils::{stake_pool::StakePool, testing::node::time}; use jortestkit::process::Wait; use std::str::FromStr; use std::time::Duration; @@ -93,6 +94,104 @@ pub fn explorer_sanity_test() { epoch(&explorer); } +#[test] +pub fn old_blocks_are_in_explorer() { + let jcli: JCli = Default::default(); + let mut faucet = startup::create_new_account_address(); + let receiver = startup::create_new_account_address(); + let mut utxo_receiver = startup::create_new_utxo_address(); + + let epoch_stability_depth = 3; + let mut config = ConfigurationBuilder::new(); + config + .with_epoch_stability_depth(epoch_stability_depth) + .with_explorer(); + + let (jormungandr, initial_stake_pools) = + startup::start_stake_pool(&[faucet.clone()], &[], &mut config).unwrap(); + + let transaction = faucet + .transaction_to( + &jormungandr.genesis_block_hash(), + &jormungandr.fees(), + utxo_receiver.address(), + 1_000.into(), + ) + .unwrap(); + + let output_value = match transaction { + Fragment::Transaction(ref tx) => { + let output = tx.as_slice().outputs().iter().next().unwrap(); + + output.value + } + _ => todo!(), + }; + + let encoded_transaction = transaction.encode(); + + let wait = Wait::new(Duration::from_secs(3), 20); + let fragment_id = jcli + .fragment_sender(&jormungandr) + .send(&encoded_transaction) + .assert_in_block_with_wait(&wait); + + println!("explorer: utxo transaction {}", fragment_id); + + let mut explorer = jormungandr.explorer(); + + let last_block = explorer.last_block().unwrap().data.unwrap().tip.block; + let current_tip_chain_length: u32 = last_block.chain_length.parse().unwrap(); + let current_block_id = last_block.id; + + time::wait_n_blocks( + ChainLength::from(current_tip_chain_length), + epoch_stability_depth + 5, + &mut explorer, + ); + + utxo_receiver.add_utxo(UtxoPointer { + transaction_id: transaction.hash(), + output_index: 0, + value: output_value, + }); + + let utxo_tx = utxo_receiver + .transaction_to( + &jormungandr.genesis_block_hash(), + &jormungandr.fees(), + utxo_receiver.address(), + 1000.into(), + ) + .unwrap() + .encode(); + + let _fragment_id = jcli + .fragment_sender(&jormungandr) + .send(&utxo_tx) + .assert_in_block_with_wait(&wait); + + let block = explorer + .blocks_at_chain_length(current_tip_chain_length) + .unwrap(); + + let data = block.data.unwrap(); + assert_eq!(data.blocks_by_chain_length.len(), 1, "block not found"); + + assert_eq!( + current_block_id, data.blocks_by_chain_length[0].id, + "unexpected block id was found" + ); + + println!("{}", jormungandr.logger.get_log_content()); + + transaction_by_id(&explorer, fragment_id.into()); + blocks(&explorer, jormungandr.logger.get_created_blocks_hashes()); + stake_pools(&explorer, &initial_stake_pools); + stake_pool(&explorer, &initial_stake_pools); + blocks_in_epoch(&explorer, jormungandr.logger.get_created_blocks_hashes()); +} + fn transaction_by_id(explorer: &Explorer, fragment_id: FragmentId) { let explorer_transaction = explorer .transaction(fragment_id.into()) @@ -179,3 +278,31 @@ fn epoch(explorer: &Explorer) { assert_eq!(epoch.data.unwrap().epoch.id, "1", "can't find epoch"); } + +fn blocks_in_epoch(explorer: &Explorer, blocks_from_logs: Vec) { + let epoch = explorer.epoch(0, 100).unwrap(); + let explorer_blocks = epoch + .data + .unwrap() + .tip + .blocks_by_epoch + .unwrap() + .edges + .unwrap() + .iter() + .skip(1) + .map(|x| Hash::from_str(&x.as_ref().unwrap().node.id).unwrap()) + .collect::>(); + + let mut common_blocks = blocks_from_logs.clone(); + common_blocks.retain(|x| !explorer_blocks.contains(x)); + + // we can have at least one non duplicated block + // due to explorer delay to logs content + assert!( + common_blocks.len() <= 1, + "blocks differents: Explorer {:?} vs Logs {:?}", + explorer_blocks, + blocks_from_logs + ); +} diff --git a/testing/jormungandr-testing-utils/src/testing/node/time.rs b/testing/jormungandr-testing-utils/src/testing/node/time.rs index e64f1fed23..ae6b087959 100644 --- a/testing/jormungandr-testing-utils/src/testing/node/time.rs +++ b/testing/jormungandr-testing-utils/src/testing/node/time.rs @@ -1,4 +1,5 @@ use crate::testing::node::explorer::Explorer; +use chain_impl_mockchain::block::ChainLength; use jormungandr_lib::interfaces::BlockDate; pub fn wait_for_epoch(epoch_id: u64, mut explorer: Explorer) { @@ -39,3 +40,24 @@ pub fn wait_for_date(target_block_date: BlockDate, mut explorer: Explorer) { std::thread::sleep(std::time::Duration::from_secs(1)); } } + +pub fn wait_n_blocks(start: ChainLength, n: u32, explorer: &Explorer) { + loop { + let current = explorer + .last_block() + .unwrap() + .data + .unwrap() + .tip + .block + .chain_length; + + let current: u32 = current.parse().unwrap(); + + if u32::from(start) + n <= current { + return; + } + + std::thread::sleep(std::time::Duration::from_secs(2)); + } +} diff --git a/testing/jormungandr-testing-utils/src/wallet/mod.rs b/testing/jormungandr-testing-utils/src/wallet/mod.rs index cae14ad998..762ece1fba 100644 --- a/testing/jormungandr-testing-utils/src/wallet/mod.rs +++ b/testing/jormungandr-testing-utils/src/wallet/mod.rs @@ -19,14 +19,14 @@ use chain_impl_mockchain::{ testing::data::{AddressData, AddressDataValue, Wallet as WalletLib}, transaction::{ InputOutputBuilder, Payload, PayloadSlice, TransactionBindingAuthDataPhantom, - TransactionSignDataHash, Witness, + TransactionSignDataHash, UtxoPointer, Witness, }, value::Value as ValueLib, vote::{Choice, CommitteeId}, }; use jormungandr_lib::{ crypto::{account::Identifier as AccountIdentifier, hash::Hash, key::Identifier}, - interfaces::{Address, CommitteeIdDef, Initial, InitialUTxO, Value}, + interfaces::{Address, CommitteeIdDef, Initial, InitialUTxO, UTxOInfo, Value}, }; use chain_addr::Discrimination; @@ -233,7 +233,7 @@ impl Wallet { pub fn add_input_with_value(&self, value: Value) -> Input { match self { Wallet::Account(account) => account.add_input_with_value(value), - Wallet::UTxO(_utxo) => unimplemented!(), + Wallet::UTxO(utxo) => utxo.add_input_with_value(value), Wallet::Delegation(_delegation) => unimplemented!(), } } @@ -430,6 +430,21 @@ impl Wallet { self.address().1.public_key().unwrap().clone(), )) } + + pub fn add_utxo(&mut self, utxo_pointer: UtxoPointer) { + match self { + Wallet::UTxO(utxo) => { + let info = UTxOInfo::new( + utxo_pointer.transaction_id.into(), + utxo_pointer.output_index, + utxo.address(), + utxo_pointer.value.into(), + ); + utxo.add_utxo(info); + } + _ => todo!("can not add utxo to non-utxo wallet"), + } + } } impl From for WalletLib { diff --git a/testing/jormungandr-testing-utils/src/wallet/utxo.rs b/testing/jormungandr-testing-utils/src/wallet/utxo.rs index 88c904b26f..873efd13ec 100644 --- a/testing/jormungandr-testing-utils/src/wallet/utxo.rs +++ b/testing/jormungandr-testing-utils/src/wallet/utxo.rs @@ -1,11 +1,11 @@ use chain_addr::Discrimination; -use chain_impl_mockchain::transaction::{TransactionSignDataHash, Witness}; +use chain_impl_mockchain::transaction::{Input, TransactionSignDataHash, UtxoPointer, Witness}; use jormungandr_lib::{ crypto::{ hash::Hash, key::{self, Identifier}, }, - interfaces::{Address, UTxOInfo}, + interfaces::{Address, UTxOInfo, Value}, }; use rand_chacha::ChaChaRng; use rand_core::{CryptoRng, RngCore, SeedableRng}; @@ -94,4 +94,26 @@ impl Wallet { self.last_signing_key().as_ref().sign(d) }) } + + pub fn add_input_with_value(&self, value: Value) -> Input { + if let Some((_, info)) = self + .utxos + .iter() + .find(|(_, info)| info.associated_fund() >= &value) + { + let utxo = UtxoPointer { + transaction_id: info.transaction_id().into_hash(), + output_index: info.index_in_transaction(), + value: value.into(), + }; + + Input::from_utxo(utxo) + } else { + todo!("no utxo found to cover for {}", value); + } + } + + pub fn add_utxo(&mut self, utxo: UTxOInfo) { + self.utxos.push((0, utxo)); + } } From 161a57cf520857d2b4ee9842407aaa459cb21dad Mon Sep 17 00:00:00 2001 From: Enzo Cioppettini Date: Fri, 9 Apr 2021 20:53:01 -0300 Subject: [PATCH 7/7] remove useless debugging trace --- jormungandr/src/explorer/indexing.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/jormungandr/src/explorer/indexing.rs b/jormungandr/src/explorer/indexing.rs index 2707ead880..abd61cc28f 100644 --- a/jormungandr/src/explorer/indexing.rs +++ b/jormungandr/src/explorer/indexing.rs @@ -18,7 +18,6 @@ use chain_impl_mockchain::vote::{ }; use futures::stream::{self, StreamExt}; use std::{convert::TryInto, sync::Arc}; -use tracing::trace; pub type Hamt = imhamt::Hamt>; @@ -427,8 +426,6 @@ impl ExplorerTransaction { let tx = utxo_pointer.transaction_id; let index = utxo_pointer.output_index; - trace!("found utxo inupt, processing, {}", &tx); - let output = match context .prev_transactions .lookup(&tx)