Skip to content

Commit

Permalink
Merge pull request #2962 from input-output-hk/explorer-add-gc-and-con…
Browse files Browse the repository at this point in the history
…firmed-block-notion

Explorer add gc and confirmed block notion
  • Loading branch information
ecioppettini authored Feb 4, 2021
2 parents e2dc4f8 + b11edda commit 2d9ad4c
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 74 deletions.
7 changes: 2 additions & 5 deletions jormungandr/src/blockchain/multiverse.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::blockcfg::{ChainLength, HeaderHash, Ledger, Multiverse as MultiverseData};
use crate::blockcfg::{ChainLength, HeaderHash, Multiverse as MultiverseData};
use chain_impl_mockchain::multiverse;
use std::sync::Arc;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -43,12 +43,9 @@ impl<T: Clone> Multiverse<T> {
}
}

impl Multiverse<Ledger> {
impl<T> Multiverse<T> {
/// run the garbage collection of the multiverse
///
/// TODO: this function is only working for the `Ledger` at the moment
/// we need to generalize the `chain_impl_mockchain` to handle
/// the garbage collection for any `T`
pub async fn gc(&self, depth: u32) {
let mut guard = self.inner.write().await;
guard.gc(depth)
Expand Down
12 changes: 12 additions & 0 deletions jormungandr/src/explorer/graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ impl Block {
});
Ok(treasury)
}

pub async fn is_confirmed(&self, context: &Context) -> bool {
context.db.is_block_confirmed(&self.hash).await
}
}

struct BftLeader {
Expand Down Expand Up @@ -698,6 +702,14 @@ impl Status {
latest_block(context).await.map(|b| Block::from(&b))
}

pub async fn epoch_stability_depth(&self, context: &Context) -> String {
context
.db
.blockchain_config
.epoch_stability_depth
.to_string()
}

pub fn fee_settings(&self, context: &Context) -> FeeSettings {
let chain_impl_mockchain::fee::LinearFee {
constant,
Expand Down
202 changes: 134 additions & 68 deletions jormungandr/src/explorer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ use chain_impl_mockchain::fee::LinearFee;
use chain_impl_mockchain::multiverse;
use futures::prelude::*;
use std::convert::Infallible;
use std::sync::Arc;
use tokio::sync::RwLock;
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc,
};
use tokio::sync::{Mutex, RwLock};

#[derive(Clone)]
pub struct Explorer {
Expand All @@ -53,6 +56,12 @@ pub struct ExplorerDB {
pub blockchain_config: BlockchainConfig,
blockchain: Blockchain,
blockchain_tip: blockchain::Tip,
stable_store: StableIndex,
}

#[derive(Clone)]
pub struct StableIndex {
confirmed_block_chain_length: Arc<AtomicU32>,
}

#[derive(Clone)]
Expand All @@ -62,6 +71,7 @@ pub struct BlockchainConfig {
discrimination: Discrimination,
consensus_version: ConsensusVersion,
fees: LinearFee,
epoch_stability_depth: u32,
}

/// Inmutable data structure used to represent the explorer's state at a given Block
Expand Down Expand Up @@ -104,35 +114,42 @@ impl Explorer {
}
}

pub async fn start(&mut self, info: TokioServiceInfo, messages: MessageQueue<ExplorerMsg>) {
pub async fn start(&self, info: TokioServiceInfo, messages: MessageQueue<ExplorerMsg>) {
let tip_candidate: Arc<Mutex<Option<HeaderHash>>> = Arc::new(Mutex::new(None));

messages
.for_each(|input| async {
.for_each(|input| {
let explorer_db = self.db.clone();
let tip_candidate = Arc::clone(&tip_candidate);
match input {
ExplorerMsg::NewBlock(block) => {
let explorer_db = self.db.clone();
let logger = info.logger().clone();
info.spawn_fallible("apply block", async move {
explorer_db
.apply_block(block)
.map(move |result| match result {
// XXX: There is no garbage collection now, so the GCRoot is not used
Ok(_gc_root) => Ok(()),
Err(err) => {
error!(logger, "Explorer error: {}", err);
Err(())
}
})
.await
info.spawn_fallible::<_, Error>("apply block to explorer", async move {
let _state_ref = explorer_db.apply_block(block.clone()).await?;

let mut guard = tip_candidate.lock().await;
if guard.map(|hash| hash == block.header.id()).unwrap_or(false) {
let hash = guard.take().unwrap();
explorer_db.set_tip(hash).await;
}

Ok(())
});
}
ExplorerMsg::NewTip(hash) => {
let explorer_db = self.db.clone();
info.spawn(
"apply block",
async move { explorer_db.set_tip(hash).await },
);
info.spawn_fallible::<_, Error>("apply block to explorer", async move {
let successful = explorer_db.set_tip(hash).await;

if !successful {
let mut guard = tip_candidate.lock().await;
guard.replace(hash);
}

Ok(())
});
}
}
};

futures::future::ready(())
})
.await;
}
Expand Down Expand Up @@ -197,24 +214,27 @@ impl ExplorerDB {

let block0_id = block0.id();

let bootstraped_db = ExplorerDB {
multiverse,
longest_chain_tip: Tip::new(block0.header.id()),
blockchain_config,
blockchain: blockchain.clone(),
blockchain_tip,
};

let maybe_head = blockchain.storage().get_tag(MAIN_BRANCH_TAG)?;
let stream = match maybe_head {
Some(head) => blockchain.storage().stream_from_to(block0_id, head)?,
let (stream, hash) = match maybe_head {
Some(head) => (blockchain.storage().stream_from_to(block0_id, head)?, head),
None => {
return Err(Error::from(ErrorKind::BootstrapError(
"Couldn't read the HEAD tag from storage".to_owned(),
)))
}
};

let bootstraped_db = ExplorerDB {
multiverse,
longest_chain_tip: Tip::new(hash),
blockchain_config,
blockchain: blockchain.clone(),
blockchain_tip,
stable_store: StableIndex {
confirmed_block_chain_length: Arc::new(AtomicU32::default()),
},
};

let db = stream
.map_err(Error::from)
.try_fold(bootstraped_db, |db, block| async move {
Expand Down Expand Up @@ -311,11 +331,6 @@ impl ExplorerDB {
Ok(state_ref)
}

pub async fn set_tip(&self, hash: HeaderHash) {
let mut guard = self.longest_chain_tip.0.write().await;
*guard = hash;
}

pub async fn get_latest_block_hash(&self) -> HeaderHash {
self.longest_chain_tip.get_block_id().await
}
Expand All @@ -328,6 +343,45 @@ impl ExplorerDB {
.await
}

pub(self) async fn set_tip(&self, hash: HeaderHash) -> bool {
// the tip changes which means now a block is confirmed (at least after
// the initial epoch_stability_depth blocks).

let block = if let Some(state_ref) = self.multiverse.get_ref(hash).await {
let state = state_ref.state();
Arc::clone(state.blocks.lookup(&hash).unwrap())
} else {
return false;
};

if let Some(confirmed_block_chain_length) = block
.chain_length()
.nth_ancestor(self.blockchain_config.epoch_stability_depth)
{
debug_assert!(
ChainLength::from(
self.stable_store
.confirmed_block_chain_length
.load(Ordering::Acquire)
) <= block.chain_length()
);

self.stable_store
.confirmed_block_chain_length
.store(confirmed_block_chain_length.into(), Ordering::Release);

self.multiverse
.gc(self.blockchain_config.epoch_stability_depth)
.await;
}

let mut guard = self.longest_chain_tip.0.write().await;

*guard = hash;

true
}

pub async fn get_epoch(&self, epoch: Epoch) -> Option<EpochData> {
self.with_latest_state(move |state| state.epochs.lookup(&epoch).map(|e| e.as_ref().clone()))
.await
Expand Down Expand Up @@ -397,6 +451,19 @@ impl ExplorerDB {
.await
}

pub async fn is_block_confirmed(&self, block_id: &HeaderHash) -> bool {
if let Some(block) = self.get_block(block_id).await {
let confirmed_block_chain_length: ChainLength = self
.stable_store
.confirmed_block_chain_length
.load(Ordering::Acquire)
.into();
block.chain_length <= confirmed_block_chain_length
} else {
false
}
}

pub async fn get_stake_pool_blocks(
&self,
pool: &PoolId,
Expand Down Expand Up @@ -771,37 +838,36 @@ fn apply_block_to_vote_plans(

impl BlockchainConfig {
fn from_config_params(params: &ConfigParams) -> BlockchainConfig {
let discrimination = params
.iter()
.filter_map(|param| match param {
ConfigParam::Discrimination(discrimination) => Some(discrimination),
_ => None,
})
.next()
.expect("the discrimination to be present");

let consensus_version = params
.iter()
.filter_map(|param| match param {
ConfigParam::ConsensusVersion(version) => Some(version),
_ => None,
})
.next()
.expect("consensus version to be present");

let fees = params
.iter()
.filter_map(|param| match param {
ConfigParam::LinearFee(fee) => Some(fee),
_ => None,
})
.next()
.expect("fee is not in config params");
let mut discrimination: Option<Discrimination> = None;
let mut consensus_version: Option<ConsensusVersion> = None;
let mut fees: Option<LinearFee> = None;
let mut epoch_stability_depth: Option<u32> = None;

for p in params.iter() {
match p {
ConfigParam::Discrimination(d) => {
discrimination.replace(*d);
}
ConfigParam::ConsensusVersion(v) => {
consensus_version.replace(*v);
}
ConfigParam::LinearFee(fee) => {
fees.replace(*fee);
}
ConfigParam::EpochStabilityDepth(d) => {
epoch_stability_depth.replace(*d);
}
_ => (),
}
}

BlockchainConfig {
discrimination: *discrimination,
consensus_version: *consensus_version,
fees: *fees,
discrimination: discrimination.expect("discrimination not found in initial params"),
consensus_version: consensus_version
.expect("consensus version not found in initial params"),
fees: fees.expect("fees not found in initial params"),
epoch_stability_depth: epoch_stability_depth
.expect("epoch stability depth not found in initial params"),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion jormungandr/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E
.explorer_db
.expect("explorer db to be bootstrapped");

let mut explorer = explorer::Explorer::new(explorer_db);
let explorer = explorer::Explorer::new(explorer_db);

// Context to give to the rest api
let context = explorer.clone();
Expand Down

0 comments on commit 2d9ad4c

Please sign in to comment.