Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explorer add gc and confirmed block notion #2962

Merged
merged 9 commits into from
Feb 4, 2021
7 changes: 2 additions & 5 deletions jormungandr/src/blockchain/multiverse.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::blockcfg::{ChainLength, HeaderHash, Ledger, Multiverse as MultiverseData};
use crate::blockcfg::{ChainLength, HeaderHash, Multiverse as MultiverseData};
use chain_impl_mockchain::multiverse;
use std::sync::Arc;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -43,12 +43,9 @@ impl<T: Clone> Multiverse<T> {
}
}

impl Multiverse<Ledger> {
impl<T> Multiverse<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh? I was pretty sure I already did that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, probably you did it the same with the actual multiverse in chain-libs

/// run the garbage collection of the multiverse
///
/// TODO: this function is only working for the `Ledger` at the moment
/// we need to generalize the `chain_impl_mockchain` to handle
/// the garbage collection for any `T`
pub async fn gc(&self, depth: u32) {
let mut guard = self.inner.write().await;
guard.gc(depth)
Expand Down
12 changes: 12 additions & 0 deletions jormungandr/src/explorer/graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ impl Block {
});
Ok(treasury)
}

pub async fn is_confirmed(&self, context: &Context) -> bool {
context.db.is_block_confirmed(&self.hash).await
}
}

struct BftLeader {
Expand Down Expand Up @@ -698,6 +702,14 @@ impl Status {
latest_block(context).await.map(|b| Block::from(&b))
}

pub async fn epoch_stability_depth(&self, context: &Context) -> String {
context
.db
.blockchain_config
.epoch_stability_depth
.to_string()
}

pub fn fee_settings(&self, context: &Context) -> FeeSettings {
let chain_impl_mockchain::fee::LinearFee {
constant,
Expand Down
202 changes: 134 additions & 68 deletions jormungandr/src/explorer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ use chain_impl_mockchain::fee::LinearFee;
use chain_impl_mockchain::multiverse;
use futures::prelude::*;
use std::convert::Infallible;
use std::sync::Arc;
use tokio::sync::RwLock;
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc,
};
use tokio::sync::{Mutex, RwLock};

#[derive(Clone)]
pub struct Explorer {
Expand All @@ -53,6 +56,12 @@ pub struct ExplorerDB {
pub blockchain_config: BlockchainConfig,
blockchain: Blockchain,
blockchain_tip: blockchain::Tip,
stable_store: StableIndex,
}

#[derive(Clone)]
pub struct StableIndex {
confirmed_block_chain_length: Arc<AtomicU32>,
}

#[derive(Clone)]
Expand All @@ -62,6 +71,7 @@ pub struct BlockchainConfig {
discrimination: Discrimination,
consensus_version: ConsensusVersion,
fees: LinearFee,
epoch_stability_depth: u32,
}

/// Inmutable data structure used to represent the explorer's state at a given Block
Expand Down Expand Up @@ -104,35 +114,42 @@ impl Explorer {
}
}

pub async fn start(&mut self, info: TokioServiceInfo, messages: MessageQueue<ExplorerMsg>) {
pub async fn start(&self, info: TokioServiceInfo, messages: MessageQueue<ExplorerMsg>) {
let tip_candidate: Arc<Mutex<Option<HeaderHash>>> = Arc::new(Mutex::new(None));

messages
.for_each(|input| async {
.for_each(|input| {
let explorer_db = self.db.clone();
let tip_candidate = Arc::clone(&tip_candidate);
match input {
ExplorerMsg::NewBlock(block) => {
let explorer_db = self.db.clone();
let logger = info.logger().clone();
info.spawn_fallible("apply block", async move {
explorer_db
.apply_block(block)
.map(move |result| match result {
// XXX: There is no garbage collection now, so the GCRoot is not used
Ok(_gc_root) => Ok(()),
Err(err) => {
error!(logger, "Explorer error: {}", err);
Err(())
}
})
.await
info.spawn_fallible::<_, Error>("apply block to explorer", async move {
let _state_ref = explorer_db.apply_block(block.clone()).await?;

let mut guard = tip_candidate.lock().await;
if guard.map(|hash| hash == block.header.id()).unwrap_or(false) {
let hash = guard.take().unwrap();
explorer_db.set_tip(hash).await;
}

Ok(())
});
}
ExplorerMsg::NewTip(hash) => {
let explorer_db = self.db.clone();
info.spawn(
"apply block",
async move { explorer_db.set_tip(hash).await },
);
info.spawn_fallible::<_, Error>("apply block to explorer", async move {
let successful = explorer_db.set_tip(hash).await;

if !successful {
let mut guard = tip_candidate.lock().await;
guard.replace(hash);
}

Ok(())
});
}
}
};

futures::future::ready(())
})
.await;
}
Expand Down Expand Up @@ -197,24 +214,27 @@ impl ExplorerDB {

let block0_id = block0.id();

let bootstraped_db = ExplorerDB {
multiverse,
longest_chain_tip: Tip::new(block0.header.id()),
blockchain_config,
blockchain: blockchain.clone(),
blockchain_tip,
};

let maybe_head = blockchain.storage().get_tag(MAIN_BRANCH_TAG)?;
let stream = match maybe_head {
Some(head) => blockchain.storage().stream_from_to(block0_id, head)?,
let (stream, hash) = match maybe_head {
Some(head) => (blockchain.storage().stream_from_to(block0_id, head)?, head),
None => {
return Err(Error::from(ErrorKind::BootstrapError(
"Couldn't read the HEAD tag from storage".to_owned(),
)))
}
};

let bootstraped_db = ExplorerDB {
multiverse,
longest_chain_tip: Tip::new(hash),
blockchain_config,
blockchain: blockchain.clone(),
blockchain_tip,
stable_store: StableIndex {
confirmed_block_chain_length: Arc::new(AtomicU32::default()),
},
};

let db = stream
.map_err(Error::from)
.try_fold(bootstraped_db, |db, block| async move {
Expand Down Expand Up @@ -311,11 +331,6 @@ impl ExplorerDB {
Ok(state_ref)
}

pub async fn set_tip(&self, hash: HeaderHash) {
let mut guard = self.longest_chain_tip.0.write().await;
*guard = hash;
}

pub async fn get_latest_block_hash(&self) -> HeaderHash {
self.longest_chain_tip.get_block_id().await
}
Expand All @@ -328,6 +343,45 @@ impl ExplorerDB {
.await
}

pub(self) async fn set_tip(&self, hash: HeaderHash) -> bool {
// the tip changes which means now a block is confirmed (at least after
// the initial epoch_stability_depth blocks).

let block = if let Some(state_ref) = self.multiverse.get_ref(hash).await {
let state = state_ref.state();
Arc::clone(state.blocks.lookup(&hash).unwrap())
} else {
return false;
};

if let Some(confirmed_block_chain_length) = block
.chain_length()
.nth_ancestor(self.blockchain_config.epoch_stability_depth)
{
debug_assert!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just assert!? Might be difficult to track problems in production.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I just thought debug_assert was paranoid enough, and I wanted to write the invariant in some way at least. I don't really mind using assert anyway, but if this actually happens it is most likely a programmer error.

ChainLength::from(
self.stable_store
.confirmed_block_chain_length
.load(Ordering::Acquire)
) <= block.chain_length()
);

self.stable_store
.confirmed_block_chain_length
.store(confirmed_block_chain_length.into(), Ordering::Release);

self.multiverse
.gc(self.blockchain_config.epoch_stability_depth)
.await;
}

let mut guard = self.longest_chain_tip.0.write().await;

*guard = hash;

true
}

pub async fn get_epoch(&self, epoch: Epoch) -> Option<EpochData> {
self.with_latest_state(move |state| state.epochs.lookup(&epoch).map(|e| e.as_ref().clone()))
.await
Expand Down Expand Up @@ -397,6 +451,19 @@ impl ExplorerDB {
.await
}

pub async fn is_block_confirmed(&self, block_id: &HeaderHash) -> bool {
if let Some(block) = self.get_block(block_id).await {
let confirmed_block_chain_length: ChainLength = self
.stable_store
.confirmed_block_chain_length
.load(Ordering::Acquire)
.into();
block.chain_length <= confirmed_block_chain_length
} else {
false
}
}

pub async fn get_stake_pool_blocks(
&self,
pool: &PoolId,
Expand Down Expand Up @@ -771,37 +838,36 @@ fn apply_block_to_vote_plans(

impl BlockchainConfig {
fn from_config_params(params: &ConfigParams) -> BlockchainConfig {
let discrimination = params
.iter()
.filter_map(|param| match param {
ConfigParam::Discrimination(discrimination) => Some(discrimination),
_ => None,
})
.next()
.expect("the discrimination to be present");

let consensus_version = params
.iter()
.filter_map(|param| match param {
ConfigParam::ConsensusVersion(version) => Some(version),
_ => None,
})
.next()
.expect("consensus version to be present");

let fees = params
.iter()
.filter_map(|param| match param {
ConfigParam::LinearFee(fee) => Some(fee),
_ => None,
})
.next()
.expect("fee is not in config params");
let mut discrimination: Option<Discrimination> = None;
let mut consensus_version: Option<ConsensusVersion> = None;
let mut fees: Option<LinearFee> = None;
let mut epoch_stability_depth: Option<u32> = None;

for p in params.iter() {
match p {
ConfigParam::Discrimination(d) => {
discrimination.replace(*d);
}
ConfigParam::ConsensusVersion(v) => {
consensus_version.replace(*v);
}
ConfigParam::LinearFee(fee) => {
fees.replace(*fee);
}
ConfigParam::EpochStabilityDepth(d) => {
epoch_stability_depth.replace(*d);
}
_ => (),
}
}

BlockchainConfig {
discrimination: *discrimination,
consensus_version: *consensus_version,
fees: *fees,
discrimination: discrimination.expect("discrimination not found in initial params"),
consensus_version: consensus_version
.expect("consensus version not found in initial params"),
fees: fees.expect("fees not found in initial params"),
epoch_stability_depth: epoch_stability_depth
.expect("epoch stability depth not found in initial params"),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion jormungandr/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E
.explorer_db
.expect("explorer db to be bootstrapped");

let mut explorer = explorer::Explorer::new(explorer_db);
let explorer = explorer::Explorer::new(explorer_db);

// Context to give to the rest api
let context = explorer.clone();
Expand Down