diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 87e2eacdf50d7..7aae08598d537 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -699,8 +699,24 @@ impl Core { received_quorum_rounds: Vec, accepted_quorum_rounds: Vec, ) { - info!("Received quorum round per authority in ancestor state manager set to: {received_quorum_rounds:?}"); - info!("Accepted quorum round per authority in ancestor state manager set to: {accepted_quorum_rounds:?}"); + info!( + "Received quorum round per authority in ancestor state manager set to: {}", + self.context + .committee + .authorities() + .zip(received_quorum_rounds.iter()) + .map(|((i, _), rounds)| format!("{i}: {rounds:?}")) + .join(", ") + ); + info!( + "Accepted quorum round per authority in ancestor state manager set to: {}", + self.context + .committee + .authorities() + .zip(accepted_quorum_rounds.iter()) + .map(|((i, _), rounds)| format!("{i}: {rounds:?}")) + .join(", ") + ); self.ancestor_state_manager .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds); info!("Propagation round delay set to: {delay}"); @@ -831,10 +847,8 @@ impl Core { clock_round: Round, smart_select: bool, ) -> Vec { - let _s = self - .context - .metrics - .node_metrics + let node_metrics = &self.context.metrics.node_metrics; + let _s = node_metrics .scope_processing_time .with_label_values(&["Core::smart_ancestors_to_propose"]) .start_timer(); @@ -904,7 +918,7 @@ impl Core { } if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) { - self.context.metrics.node_metrics.smart_selection_wait.inc(); + node_metrics.smart_selection_wait.inc(); debug!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake()); return vec![]; } @@ -924,32 +938,76 @@ impl Core { debug!("Including temporarily excluded strong link ancestor {ancestor} with score {score} to propose for round {clock_round}"); parent_round_quorum.add(ancestor.author(), &self.context.committee); ancestors_to_propose.push(ancestor); - self.context - .metrics - .node_metrics + node_metrics .included_excluded_proposal_ancestors_count_by_authority - .with_label_values(&[block_hostname, "strong"]) + .with_label_values(&[block_hostname, "timeout"]) .inc(); } else { excluded_ancestors.push((score, ancestor)); } } - assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."); - + // Include partially propagated blocks from excluded authorities, to help propagate the blocks + // across the network with less latency impact. + // TODO: use a separate mechanism to propagate excluded ancestor blocks and remove this logic. for (score, ancestor) in excluded_ancestors.iter() { let excluded_author = ancestor.author(); let block_hostname = &self.context.committee.authority(excluded_author).hostname; + // A quorum of validators reported to have accepted blocks from the excluded_author up to the low quorum round. + let mut accepted_low_quorum_round = self + .ancestor_state_manager + .accepted_quorum_round_per_authority[excluded_author] + .0; + // If the accepted quorum round of this ancestor is greater than or equal + // to the clock round then we want to make sure to set it to clock_round - 1 + // as that is the max round the new block can include as an ancestor. + accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round); + + let last_included_round = self.last_included_ancestors[excluded_author] + .map(|block_ref| block_ref.round) + .unwrap_or(GENESIS_ROUND); + if last_included_round >= accepted_low_quorum_round { + trace!( + "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {} >= accepted low quorum round {}", + ancestor.reference(), last_included_round, accepted_low_quorum_round, + ); + node_metrics + .excluded_proposal_ancestors_count_by_authority + .with_label_values(&[block_hostname]) + .inc(); + continue; + } - trace!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}"); - self.context - .metrics - .node_metrics - .excluded_proposal_ancestors_count_by_authority - .with_label_values(&[block_hostname]) + // Include the ancestor block as it has been seen & accepted by a strong quorum. + let ancestor = if ancestor.round() == accepted_low_quorum_round { + ancestor.clone() + } else { + // Only cached blocks need to be propagated. Committed and GC'ed blocks do not need to be propagated. + let Some(ancestor) = self.dag_state.read().get_last_cached_block_in_range( + excluded_author, + last_included_round + 1, + accepted_low_quorum_round + 1, + ) else { + trace!("Excluded low score ancestor {} with score {score} to propose for round {clock_round}: no suitable block found", ancestor.reference()); + node_metrics + .excluded_proposal_ancestors_count_by_authority + .with_label_values(&[block_hostname]) + .inc(); + continue; + }; + ancestor + }; + self.last_included_ancestors[excluded_author] = Some(ancestor.reference()); + ancestors_to_propose.push(ancestor.clone()); + trace!("Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}", ancestor.reference()); + node_metrics + .included_excluded_proposal_ancestors_count_by_authority + .with_label_values(&[block_hostname, "quorum"]) .inc(); } + assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."); + info!( "Included {} ancestors & excluded {} ancestors for proposal in round {clock_round}", ancestors_to_propose.len(), diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index b4981b2ae2e66..4c93cc59d701f 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -502,10 +502,38 @@ impl DagState { blocks } - /// Returns the last block proposed per authority with `round < end_round`. + // Retrieves the cached block within the range [start_round, end_round) from a given authority. + // NOTE: end_round must be greater than GENESIS_ROUND. + pub(crate) fn get_last_cached_block_in_range( + &self, + authority: AuthorityIndex, + start_round: Round, + end_round: Round, + ) -> Option { + if end_round == GENESIS_ROUND { + panic!( + "Attempted to retrieve blocks earlier than the genesis round which is impossible" + ); + } + + let block_ref = self.recent_refs_by_authority[authority] + .range(( + Included(BlockRef::new(start_round, authority, BlockDigest::MIN)), + Excluded(BlockRef::new( + end_round, + AuthorityIndex::MIN, + BlockDigest::MIN, + )), + )) + .last()?; + + self.recent_blocks.get(block_ref).cloned() + } + + /// Returns the last block proposed per authority with `evicted round < round < end_round`. /// The method is guaranteed to return results only when the `end_round` is not earlier of the - /// available cached data for each authority, otherwise the method will panic - it's the caller's - /// responsibility to ensure that is not requesting filtering for earlier rounds . + /// available cached data for each authority (evicted round + 1), otherwise the method will panic. + /// It's the caller's responsibility to ensure that is not requesting for earlier rounds. /// In case of equivocation for an authority's last slot only one block will be returned (the last in order). pub(crate) fn get_last_cached_block_per_authority( &self, @@ -1957,7 +1985,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_get_cached_last_block_per_authority(#[values(0, 1)] gc_depth: u32) { + async fn test_get_last_cached_block(#[values(0, 1)] gc_depth: u32) { // GIVEN const CACHED_ROUNDS: Round = 2; let (mut context, _) = Context::new_for_test(4); @@ -2010,14 +2038,46 @@ mod test { // WHEN search for the latest blocks let end_round = 4; + let expected_rounds = vec![0, 1, 2, 3]; + + // THEN let last_blocks = dag_state.get_last_cached_block_per_authority(end_round); + assert_eq!( + last_blocks.iter().map(|b| b.round()).collect::>(), + expected_rounds + ); + + // THEN + for (i, expected_round) in expected_rounds.iter().enumerate() { + let round = dag_state + .get_last_cached_block_in_range( + context.committee.to_authority_index(i).unwrap(), + 0, + end_round, + ) + .map(|b| b.round()) + .unwrap_or_default(); + assert_eq!(round, *expected_round, "Authority {i}"); + } + + // WHEN starting from round 2 + let start_round = 2; + let expected_rounds = [0, 0, 2, 3]; // THEN - assert_eq!(last_blocks[0].round(), 0); - assert_eq!(last_blocks[1].round(), 1); - assert_eq!(last_blocks[2].round(), 2); - assert_eq!(last_blocks[3].round(), 3); + for (i, expected_round) in expected_rounds.iter().enumerate() { + let round = dag_state + .get_last_cached_block_in_range( + context.committee.to_authority_index(i).unwrap(), + start_round, + end_round, + ) + .map(|b| b.round()) + .unwrap_or_default(); + assert_eq!(round, *expected_round, "Authority {i}"); + } + // WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger // WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger // a clean up in the internal cache. That will keep the all the blocks with rounds >= authority_commit_round - CACHED_ROUND. // @@ -2027,13 +2087,27 @@ mod test { // AND we request before round 3 let end_round = 3; + let expected_rounds = vec![0, 1, 2, 2]; + + // THEN let last_blocks = dag_state.get_last_cached_block_per_authority(end_round); + assert_eq!( + last_blocks.iter().map(|b| b.round()).collect::>(), + expected_rounds + ); // THEN - assert_eq!(last_blocks[0].round(), 0); - assert_eq!(last_blocks[1].round(), 1); - assert_eq!(last_blocks[2].round(), 2); - assert_eq!(last_blocks[3].round(), 2); + for (i, expected_round) in expected_rounds.iter().enumerate() { + let round = dag_state + .get_last_cached_block_in_range( + context.committee.to_authority_index(i).unwrap(), + 0, + end_round, + ) + .map(|b| b.round()) + .unwrap_or_default(); + assert_eq!(round, *expected_round, "Authority {i}"); + } } #[tokio::test]