diff --git a/jormungandr/src/blockchain/multiverse.rs b/jormungandr/src/blockchain/multiverse.rs index d63694a7c3..c829bf61fb 100644 --- a/jormungandr/src/blockchain/multiverse.rs +++ b/jormungandr/src/blockchain/multiverse.rs @@ -1,4 +1,4 @@ -use crate::blockcfg::{ChainLength, HeaderHash, Ledger, Multiverse as MultiverseData}; +use crate::blockcfg::{ChainLength, HeaderHash, Multiverse as MultiverseData}; use chain_impl_mockchain::multiverse; use std::sync::Arc; use tokio::sync::RwLock; @@ -43,12 +43,9 @@ impl Multiverse { } } -impl Multiverse { +impl Multiverse { /// run the garbage collection of the multiverse /// - /// TODO: this function is only working for the `Ledger` at the moment - /// we need to generalize the `chain_impl_mockchain` to handle - /// the garbage collection for any `T` pub async fn gc(&self, depth: u32) { let mut guard = self.inner.write().await; guard.gc(depth) diff --git a/jormungandr/src/explorer/graphql/mod.rs b/jormungandr/src/explorer/graphql/mod.rs index 8cfe2210d3..66fb847ad1 100644 --- a/jormungandr/src/explorer/graphql/mod.rs +++ b/jormungandr/src/explorer/graphql/mod.rs @@ -190,6 +190,10 @@ impl Block { }); Ok(treasury) } + + pub async fn is_confirmed(&self, context: &Context) -> bool { + context.db.is_block_confirmed(&self.hash).await + } } struct BftLeader { @@ -698,6 +702,14 @@ impl Status { latest_block(context).await.map(|b| Block::from(&b)) } + pub async fn epoch_stability_depth(&self, context: &Context) -> String { + context + .db + .blockchain_config + .epoch_stability_depth + .to_string() + } + pub fn fee_settings(&self, context: &Context) -> FeeSettings { let chain_impl_mockchain::fee::LinearFee { constant, diff --git a/jormungandr/src/explorer/mod.rs b/jormungandr/src/explorer/mod.rs index 61fd9d6c29..f2172cc201 100644 --- a/jormungandr/src/explorer/mod.rs +++ b/jormungandr/src/explorer/mod.rs @@ -29,8 +29,11 @@ use chain_impl_mockchain::fee::LinearFee; use chain_impl_mockchain::multiverse; use futures::prelude::*; use std::convert::Infallible; -use std::sync::Arc; -use tokio::sync::RwLock; +use std::sync::{ + atomic::{AtomicU32, Ordering}, + Arc, +}; +use tokio::sync::{Mutex, RwLock}; #[derive(Clone)] pub struct Explorer { @@ -53,6 +56,12 @@ pub struct ExplorerDB { pub blockchain_config: BlockchainConfig, blockchain: Blockchain, blockchain_tip: blockchain::Tip, + stable_store: StableIndex, +} + +#[derive(Clone)] +pub struct StableIndex { + confirmed_block_chain_length: Arc, } #[derive(Clone)] @@ -62,6 +71,7 @@ pub struct BlockchainConfig { discrimination: Discrimination, consensus_version: ConsensusVersion, fees: LinearFee, + epoch_stability_depth: u32, } /// Inmutable data structure used to represent the explorer's state at a given Block @@ -104,35 +114,42 @@ impl Explorer { } } - pub async fn start(&mut self, info: TokioServiceInfo, messages: MessageQueue) { + pub async fn start(&self, info: TokioServiceInfo, messages: MessageQueue) { + let tip_candidate: Arc>> = Arc::new(Mutex::new(None)); + messages - .for_each(|input| async { + .for_each(|input| { + let explorer_db = self.db.clone(); + let tip_candidate = Arc::clone(&tip_candidate); match input { ExplorerMsg::NewBlock(block) => { - let explorer_db = self.db.clone(); - let logger = info.logger().clone(); - info.spawn_fallible("apply block", async move { - explorer_db - .apply_block(block) - .map(move |result| match result { - // XXX: There is no garbage collection now, so the GCRoot is not used - Ok(_gc_root) => Ok(()), - Err(err) => { - error!(logger, "Explorer error: {}", err); - Err(()) - } - }) - .await + info.spawn_fallible::<_, Error>("apply block to explorer", async move { + let _state_ref = explorer_db.apply_block(block.clone()).await?; + + let mut guard = tip_candidate.lock().await; + if guard.map(|hash| hash == block.header.id()).unwrap_or(false) { + let hash = guard.take().unwrap(); + explorer_db.set_tip(hash).await; + } + + Ok(()) }); } ExplorerMsg::NewTip(hash) => { - let explorer_db = self.db.clone(); - info.spawn( - "apply block", - async move { explorer_db.set_tip(hash).await }, - ); + info.spawn_fallible::<_, Error>("apply block to explorer", async move { + let successful = explorer_db.set_tip(hash).await; + + if !successful { + let mut guard = tip_candidate.lock().await; + guard.replace(hash); + } + + Ok(()) + }); } - } + }; + + futures::future::ready(()) }) .await; } @@ -197,17 +214,9 @@ impl ExplorerDB { let block0_id = block0.id(); - let bootstraped_db = ExplorerDB { - multiverse, - longest_chain_tip: Tip::new(block0.header.id()), - blockchain_config, - blockchain: blockchain.clone(), - blockchain_tip, - }; - let maybe_head = blockchain.storage().get_tag(MAIN_BRANCH_TAG)?; - let stream = match maybe_head { - Some(head) => blockchain.storage().stream_from_to(block0_id, head)?, + let (stream, hash) = match maybe_head { + Some(head) => (blockchain.storage().stream_from_to(block0_id, head)?, head), None => { return Err(Error::from(ErrorKind::BootstrapError( "Couldn't read the HEAD tag from storage".to_owned(), @@ -215,6 +224,17 @@ impl ExplorerDB { } }; + let bootstraped_db = ExplorerDB { + multiverse, + longest_chain_tip: Tip::new(hash), + blockchain_config, + blockchain: blockchain.clone(), + blockchain_tip, + stable_store: StableIndex { + confirmed_block_chain_length: Arc::new(AtomicU32::default()), + }, + }; + let db = stream .map_err(Error::from) .try_fold(bootstraped_db, |db, block| async move { @@ -311,11 +331,6 @@ impl ExplorerDB { Ok(state_ref) } - pub async fn set_tip(&self, hash: HeaderHash) { - let mut guard = self.longest_chain_tip.0.write().await; - *guard = hash; - } - pub async fn get_latest_block_hash(&self) -> HeaderHash { self.longest_chain_tip.get_block_id().await } @@ -328,6 +343,45 @@ impl ExplorerDB { .await } + pub(self) async fn set_tip(&self, hash: HeaderHash) -> bool { + // the tip changes which means now a block is confirmed (at least after + // the initial epoch_stability_depth blocks). + + let block = if let Some(state_ref) = self.multiverse.get_ref(hash).await { + let state = state_ref.state(); + Arc::clone(state.blocks.lookup(&hash).unwrap()) + } else { + return false; + }; + + if let Some(confirmed_block_chain_length) = block + .chain_length() + .nth_ancestor(self.blockchain_config.epoch_stability_depth) + { + debug_assert!( + ChainLength::from( + self.stable_store + .confirmed_block_chain_length + .load(Ordering::Acquire) + ) <= block.chain_length() + ); + + self.stable_store + .confirmed_block_chain_length + .store(confirmed_block_chain_length.into(), Ordering::Release); + + self.multiverse + .gc(self.blockchain_config.epoch_stability_depth) + .await; + } + + let mut guard = self.longest_chain_tip.0.write().await; + + *guard = hash; + + true + } + pub async fn get_epoch(&self, epoch: Epoch) -> Option { self.with_latest_state(move |state| state.epochs.lookup(&epoch).map(|e| e.as_ref().clone())) .await @@ -397,6 +451,19 @@ impl ExplorerDB { .await } + pub async fn is_block_confirmed(&self, block_id: &HeaderHash) -> bool { + if let Some(block) = self.get_block(block_id).await { + let confirmed_block_chain_length: ChainLength = self + .stable_store + .confirmed_block_chain_length + .load(Ordering::Acquire) + .into(); + block.chain_length <= confirmed_block_chain_length + } else { + false + } + } + pub async fn get_stake_pool_blocks( &self, pool: &PoolId, @@ -771,37 +838,36 @@ fn apply_block_to_vote_plans( impl BlockchainConfig { fn from_config_params(params: &ConfigParams) -> BlockchainConfig { - let discrimination = params - .iter() - .filter_map(|param| match param { - ConfigParam::Discrimination(discrimination) => Some(discrimination), - _ => None, - }) - .next() - .expect("the discrimination to be present"); - - let consensus_version = params - .iter() - .filter_map(|param| match param { - ConfigParam::ConsensusVersion(version) => Some(version), - _ => None, - }) - .next() - .expect("consensus version to be present"); - - let fees = params - .iter() - .filter_map(|param| match param { - ConfigParam::LinearFee(fee) => Some(fee), - _ => None, - }) - .next() - .expect("fee is not in config params"); + let mut discrimination: Option = None; + let mut consensus_version: Option = None; + let mut fees: Option = None; + let mut epoch_stability_depth: Option = None; + + for p in params.iter() { + match p { + ConfigParam::Discrimination(d) => { + discrimination.replace(*d); + } + ConfigParam::ConsensusVersion(v) => { + consensus_version.replace(*v); + } + ConfigParam::LinearFee(fee) => { + fees.replace(*fee); + } + ConfigParam::EpochStabilityDepth(d) => { + epoch_stability_depth.replace(*d); + } + _ => (), + } + } BlockchainConfig { - discrimination: *discrimination, - consensus_version: *consensus_version, - fees: *fees, + discrimination: discrimination.expect("discrimination not found in initial params"), + consensus_version: consensus_version + .expect("consensus version not found in initial params"), + fees: fees.expect("fees not found in initial params"), + epoch_stability_depth: epoch_stability_depth + .expect("epoch stability depth not found in initial params"), } } } diff --git a/jormungandr/src/main.rs b/jormungandr/src/main.rs index 519f772cc6..185afa2126 100644 --- a/jormungandr/src/main.rs +++ b/jormungandr/src/main.rs @@ -128,7 +128,7 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E .explorer_db .expect("explorer db to be bootstrapped"); - let mut explorer = explorer::Explorer::new(explorer_db); + let explorer = explorer::Explorer::new(explorer_db); // Context to give to the rest api let context = explorer.clone();