Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rust/cardano-chain-follower): Refactor chain follower to use cardano-blockchain-types #115

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
1 change: 1 addition & 0 deletions rust/cardano-chain-follower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mithril-client = { version = "0.10.4", default-features = false, features = [
] }

rbac-registration = { version = "0.0.2", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "v0.0.8" }
cardano-blockchain-types = { git = "https://github.com/input-output-hk/catalyst-libs.git", branch = "feat/cardano-blockchain-types" }

thiserror = "1.0.64"
tokio = { version = "1.40.0", features = [
Expand Down
66 changes: 32 additions & 34 deletions rust/cardano-chain-follower/src/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use std::time::Duration;

use anyhow::Context;
use cardano_blockchain_types::{Fork, MultiEraBlock, Network, Point};
use pallas::{
ledger::traverse::MultiEraHeader,
network::{
Expand All @@ -32,15 +33,14 @@ use crate::{
error::{Error, Result},
mithril_snapshot_config::MithrilUpdateMessage,
mithril_snapshot_data::latest_mithril_snapshot_id,
point::{TIP_POINT, UNKNOWN_POINT},
stats, ChainSyncConfig, MultiEraBlock, Network, Point, ORIGIN_POINT,
stats, ChainSyncConfig,
};

/// The maximum number of seconds we wait for a node to connect.
const MAX_NODE_CONNECT_TIME_SECS: u64 = 2;

/// The maximum number of times we wait for a nodeChainUpdate to connect.
/// Currently set to never give up.
/// Currently set to maximum of 5 retries.
const MAX_NODE_CONNECT_RETRIES: u64 = 5;

/// Try and connect to a node, in a robust and quick way.
Expand Down Expand Up @@ -88,7 +88,7 @@ async fn retry_connect(

/// Purge the live chain, and intersect with TIP.
async fn purge_and_intersect_tip(client: &mut PeerClient, chain: Network) -> Result<Point> {
if let Err(error) = purge_live_chain(chain, &TIP_POINT) {
if let Err(error) = purge_live_chain(chain, &Point::TIP) {
// Shouldn't happen.
error!("failed to purge live chain: {error}");
}
Expand Down Expand Up @@ -120,18 +120,18 @@ async fn resync_live_tip(client: &mut PeerClient, chain: Network) -> Result<Poin
Ok(sync_to_point)
}

/// Fetch a single block from the Peer, and Decode it.
/// Fetch a single block from the Peer, and decode it.
async fn fetch_block_from_peer(
peer: &mut PeerClient, chain: Network, point: Point, previous_point: Point, fork_count: u64,
peer: &mut PeerClient, chain: Network, point: Point, previous_point: Point, fork: Fork,
) -> anyhow::Result<MultiEraBlock> {
let block_data = peer
.blockfetch()
.fetch_single(point.clone().into())
.await
.with_context(|| "Fetching block data")?;

debug!("{chain}, {previous_point}, {fork_count}");
let live_block_data = MultiEraBlock::new(chain, block_data, &previous_point, fork_count)?;
debug!("{chain}, {previous_point}, {fork:?}");
let live_block_data = MultiEraBlock::new(chain, block_data, &previous_point, fork)?;

Ok(live_block_data)
}
Expand All @@ -141,16 +141,16 @@ async fn fetch_block_from_peer(
/// Fetch the rollback block, and try and insert it into the live-chain.
/// If its a real rollback, it will purge the chain ahead of the block automatically.
async fn process_rollback_actual(
peer: &mut PeerClient, chain: Network, point: Point, tip: &Tip, fork_count: &mut u64,
peer: &mut PeerClient, chain: Network, point: Point, tip: &Tip, fork: &mut Fork,
) -> anyhow::Result<Point> {
debug!("RollBackward: {:?} {:?}", point, tip);

// Check if the block is in the live chain, if it is, re-add it, which auto-purges the
// rest of live chain tip. And increments the fork count.
if let Some(mut block) = get_live_block(chain, &point, 0, true) {
// Even though we are re-adding the known block, increase the fork count.
block.set_fork(*fork_count);
live_chain_add_block_to_tip(chain, block, fork_count, tip.0.clone().into())?;
block.set_fork(*fork);
live_chain_add_block_to_tip(chain, block, fork, tip.0.clone().into())?;
return Ok(point);
}

Expand All @@ -165,7 +165,7 @@ async fn process_rollback_actual(
let previous_point = if let Some(previous_block) = previous_block {
let previous = previous_block.previous();
debug!("Previous block: {:?}", previous);
if previous == ORIGIN_POINT {
if previous == Point::ORIGIN {
latest_mithril_snapshot_id(chain).tip()
} else {
previous
Expand All @@ -175,9 +175,8 @@ async fn process_rollback_actual(
latest_mithril_snapshot_id(chain).tip()
};
debug!("Previous point: {:?}", previous_point);
let block =
fetch_block_from_peer(peer, chain, point.clone(), previous_point, *fork_count).await?;
live_chain_add_block_to_tip(chain, block, fork_count, tip.0.clone().into())?;
let block = fetch_block_from_peer(peer, chain, point.clone(), previous_point, *fork).await?;
live_chain_add_block_to_tip(chain, block, fork, tip.0.clone().into())?;

// Next block we receive is a rollback.
Ok(point)
Expand All @@ -186,20 +185,20 @@ async fn process_rollback_actual(
/// Process a rollback detected from the peer.
async fn process_rollback(
peer: &mut PeerClient, chain: Network, point: Point, tip: &Tip, previous_point: &Point,
fork_count: &mut u64,
fork: &mut Fork,
) -> anyhow::Result<Point> {
let rollback_slot = point.slot_or_default();
let head_slot = previous_point.slot_or_default();
debug!("Head slot: {}", head_slot);
debug!("Rollback slot: {}", rollback_slot);
debug!("Head slot: {head_slot:?}");
debug!("Rollback slot: {rollback_slot:?}");
let slot_rollback_size = if head_slot > rollback_slot {
head_slot - rollback_slot
} else {
0
};

// We actually do the work here...
let response = process_rollback_actual(peer, chain, point, tip, fork_count).await?;
let response = process_rollback_actual(peer, chain, point, tip, fork).await?;

// We never really know how many blocks are rolled back when advised by the peer, but we
// can work out how many slots. This function wraps the real work, so we can properly
Expand All @@ -212,7 +211,7 @@ async fn process_rollback(
/// Process a rollback detected from the peer.
async fn process_next_block(
peer: &mut PeerClient, chain: Network, header: HeaderContent, tip: &Tip,
previous_point: &Point, fork_count: &mut u64,
previous_point: &Point, fork: &mut Fork,
) -> anyhow::Result<Point> {
// Decode the Header of the block so we know what to fetch.
let decoded_header = MultiEraHeader::decode(
Expand All @@ -222,7 +221,7 @@ async fn process_next_block(
)
.with_context(|| "Decoding Block Header")?;

let block_point = Point::new(decoded_header.slot(), decoded_header.hash().to_vec());
let block_point = Point::new(decoded_header.slot().into(), decoded_header.hash().into());

debug!("RollForward: {block_point:?} {tip:?}");

Expand All @@ -231,20 +230,20 @@ async fn process_next_block(
chain,
block_point.clone(),
previous_point.clone(),
*fork_count,
*fork,
)
.await?;

let block_point = block.point();

// We can't store this block because we don't know the previous one so the chain
// would break, so just use it for previous.
if *previous_point == UNKNOWN_POINT {
if *previous_point == Point::UNKNOWN {
// Nothing else we can do with the first block when we don't know the previous
// one. Just return it's point.
debug!("Not storing the block, because we did not know the previous point.");
} else {
live_chain_add_block_to_tip(chain, block, fork_count, tip.0.clone().into())?;
live_chain_add_block_to_tip(chain, block, fork, tip.0.clone().into())?;
}

Ok(block_point)
Expand All @@ -255,10 +254,10 @@ async fn process_next_block(
///
/// We take ownership of the client because of that.
async fn follow_chain(
peer: &mut PeerClient, chain: Network, fork_count: &mut u64,
peer: &mut PeerClient, chain: Network, fork: &mut Fork,
) -> anyhow::Result<()> {
let mut update_sender = get_chain_update_tx_queue(chain).await;
let mut previous_point = UNKNOWN_POINT;
let mut previous_point = Point::UNKNOWN;

loop {
// debug!("Waiting for data from Cardano Peer Node:");
Expand Down Expand Up @@ -286,16 +285,15 @@ async fn follow_chain(
// subtracting current block height and the tip block height.
// IF the TIP is <= the current block height THEN we are at tip.
previous_point =
process_next_block(peer, chain, header, &tip, &previous_point, fork_count)
.await?;
process_next_block(peer, chain, header, &tip, &previous_point, fork).await?;

// This update is just for followers to know to look again at their live chains for
// new data.
notify_follower(chain, &update_sender, &chain_update::Kind::Block);
},
chainsync::NextResponse::RollBackward(point, tip) => {
previous_point =
process_rollback(peer, chain, point.into(), &tip, &previous_point, fork_count)
process_rollback(peer, chain, point.into(), &tip, &previous_point, fork)
.await?;
// This update is just for followers to know to look again at their live chains for
// new data.
Expand Down Expand Up @@ -367,8 +365,8 @@ async fn live_sync_backfill(

while let Some(block_data) = peer.blockfetch().recv_while_streaming().await? {
// Backfilled blocks get placed in the oldest fork currently on the live-chain.
let block =
MultiEraBlock::new(cfg.chain, block_data, &previous_point, 1).with_context(|| {
let block = MultiEraBlock::new(cfg.chain, block_data, &previous_point, 1.into())
.with_context(|| {
format!(
"Failed to decode block data. previous: {previous_point:?}, range: {range_msg}"
)
Expand Down Expand Up @@ -533,7 +531,7 @@ pub(crate) async fn chain_sync(cfg: ChainSyncConfig, rx: mpsc::Receiver<MithrilU
// Live Fill data starts at fork 1.
// Immutable data from a mithril snapshot is fork 0.
// Live backfill is always Fork 1.
let mut fork_count: u64 = 2;
let mut fork: Fork = Fork::from_saturating(2);

loop {
// We never have a connection if we end up around the loop, so make a new one.
Expand All @@ -551,7 +549,7 @@ pub(crate) async fn chain_sync(cfg: ChainSyncConfig, rx: mpsc::Receiver<MithrilU
}

// Note: This can ONLY return with an error, otherwise it will sync indefinitely.
if let Err(error) = follow_chain(&mut peer, cfg.chain, &mut fork_count).await {
if let Err(error) = follow_chain(&mut peer, cfg.chain, &mut fork).await {
error!(
"Cardano Client {} failed to follow chain: {}: Reconnecting.",
cfg.relay_address, error
Expand All @@ -560,7 +558,7 @@ pub(crate) async fn chain_sync(cfg: ChainSyncConfig, rx: mpsc::Receiver<MithrilU
}

// If this returns, we are on a new fork (or assume we are)
fork_count += 1;
fork.incr();
}
}

Expand Down
18 changes: 7 additions & 11 deletions rust/cardano-chain-follower/src/chain_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use std::sync::LazyLock;

use cardano_blockchain_types::Network;
use dashmap::DashMap;
use strum::IntoEnumIterator;
use tokio::{sync::Mutex, task::JoinHandle};
Expand All @@ -15,19 +16,19 @@ use crate::{
chain_sync::chain_sync,
error::{Error, Result},
mithril_snapshot_config::MithrilSnapshotConfig,
network::Network,
stats,
};

/// Default Follower block buffer size.
const DEFAULT_CHAIN_UPDATE_BUFFER_SIZE: usize = 32;

/// How many slots back from TIP is considered Immutable in the absence of a mithril
/// snapshot.
/// How many window (in slot) back from TIP is considered Immutable in the
/// absence of a mithril snapshot.
const DEFAULT_IMMUTABLE_SLOT_WINDOW: u64 = 12 * 60 * 60;

/// Type we use to manage the Sync Task handle map.
type SyncMap = DashMap<Network, Mutex<Option<JoinHandle<()>>>>;

/// Handle to the mithril sync thread. One for each Network ONLY.
static SYNC_JOIN_HANDLE_MAP: LazyLock<SyncMap> = LazyLock::new(|| {
let map = DashMap::new();
Expand Down Expand Up @@ -68,7 +69,7 @@ impl ChainSyncConfig {
}
}

/// Sets the relay to use for Chain Sync.
/// Sets the relay address to use for Chain Sync.
///
/// # Arguments
///
Expand Down Expand Up @@ -101,12 +102,11 @@ impl ChainSyncConfig {
self
}

/// Sets the the Mithril snapshot Config the `ChainSync` will use.
/// Sets the Mithril snapshot Config the `ChainSync` will use.
///
/// # Arguments
///
/// * `path`: Mithril snapshot path.
/// * `update`: Auto-update this path with the latest mithril snapshot as it changes.
/// * `cfg`: Mithril snapshot configuration.
#[must_use]
pub fn mithril_cfg(mut self, cfg: MithrilSnapshotConfig) -> Self {
self.mithril_cfg = cfg;
Expand All @@ -117,10 +117,6 @@ impl ChainSyncConfig {
///
/// Must be done BEFORE the chain can be followed.
///
/// # Arguments
///
/// * `chain`: The chain to follow.
///
/// # Returns
///
/// `Result<()>`: On success.
Expand Down
Loading
Loading