From be68d61211733e46d1cac4bdf01fef946494b980 Mon Sep 17 00:00:00 2001 From: Stephane Gurgenidze Date: Thu, 9 Jan 2025 18:34:14 +0400 Subject: [PATCH 01/11] fix(collation-generation): resolve mismatch between descriptor and commitments core index --- polkadot/node/collation-generation/src/lib.rs | 45 ++++++++++++++++++- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index b371017a8289..2c5d2e687552 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -344,7 +344,8 @@ impl CollationGenerationSubsystem { Box::pin(async move { let transposed_claim_queue = transpose_claim_queue(claim_queue.0); - for core_index in cores_to_build_on { + let mut prev_core_selector_index = cores_to_build_on.len(); + for core_index in &cores_to_build_on { let collator_fn = match task_config.collator.as_ref() { Some(x) => x, None => return, @@ -363,6 +364,46 @@ impl CollationGenerationSubsystem { }, }; + // Determine the core index for the descriptor. + + // Use the core_selector method from CandidateCommitments to extract + // CoreSelector. + let mut commitments = CandidateCommitments::default(); + commitments.upward_messages = collation.upward_messages.clone(); + + let descriptor_core_index = match commitments.core_selector() { + // If a valid CoreSelector is found, calculate the core index based on it. + Ok(Some((core_selector, _cq_offset))) => { + let core_selector_index = + core_selector.0 as usize % cores_to_build_on.len(); + + if cores_to_build_on.len() > 1 && + prev_core_selector_index == core_selector_index + { + // Malicious behavior detected: The parachain is assigned to + // multiple cores but repeatedly selects the same core. This is + // unexpected and the process should stop here. + gum::error!(target: LOG_TARGET, "Parachain repeatedly selected the same core index"); + return; + } + + prev_core_selector_index = core_selector_index; + + let Some(commitments_core_index) = + cores_to_build_on.get(core_selector_index) + else { + // This should not happen, as the core_selector index is modulo the + // length of cores_to_build_on and we ensure that cores_to_build_on + // is not empty. + continue; + }; + + *commitments_core_index + }, + // Fallback to the sequential core_index if no valid CoreSelector is found. + _ => *core_index, + }; + let parent_head = collation.head_data.clone(); if let Err(err) = construct_and_distribute_receipt( PreparedCollation { @@ -372,7 +413,7 @@ impl CollationGenerationSubsystem { validation_data: validation_data.clone(), validation_code_hash, n_validators, - core_index, + core_index: descriptor_core_index, session_index, }, task_config.key.clone(), From 2ea32709a95fb6e433c4255a86b88d3e884b9110 Mon Sep 17 00:00:00 2001 From: Stephane Gurgenidze Date: Tue, 14 Jan 2025 17:07:48 +0400 Subject: [PATCH 02/11] fix(collation-generation): correct logic for using claim queue offset --- polkadot/node/collation-generation/src/lib.rs | 193 +++++++++++++----- 1 file changed, 140 insertions(+), 53 deletions(-) diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index 2c5d2e687552..f37b2e49221c 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -53,7 +53,7 @@ use polkadot_primitives::{ node_features::FeatureIndex, vstaging::{ transpose_claim_queue, CandidateDescriptorV2, CandidateReceiptV2 as CandidateReceipt, - CommittedCandidateReceiptV2, TransposedClaimQueue, + ClaimQueueOffset, CommittedCandidateReceiptV2, TransposedClaimQueue, }, CandidateCommitments, CandidateDescriptor, CollatorPair, CoreIndex, Hash, Id as ParaId, NodeFeatures, OccupiedCoreAssumption, PersistedValidationData, SessionIndex, @@ -276,13 +276,15 @@ impl CollationGenerationSubsystem { let claim_queue = ClaimQueueSnapshot::from(request_claim_queue(relay_parent, ctx.sender()).await.await??); - let cores_to_build_on = claim_queue - .iter_claims_at_depth(0) - .filter_map(|(core_idx, para_id)| (para_id == config.para_id).then_some(core_idx)) + let assigned_cores = claim_queue + .iter_all_claims() + .filter_map(|(core_idx, para_ids)| { + para_ids.iter().any(|¶_id| para_id == config.para_id).then_some(*core_idx) + }) .collect::>(); - // Nothing to do if no core assigned to us. - if cores_to_build_on.is_empty() { + // Nothing to do if no core is assigned to us at any depth. + if assigned_cores.is_empty() { return Ok(()) } @@ -342,66 +344,80 @@ impl CollationGenerationSubsystem { ctx.spawn( "chained-collation-builder", Box::pin(async move { + // Build the first collation in advance to find the claim queue offset and retrieve + // the list of cores to submit collations on. + let collator_fn = match task_config.collator.as_ref() { + Some(x) => x, + None => return, + }; + + let (mut collation, mut result_sender) = + match collator_fn(relay_parent, &validation_data).await { + Some(collation) => collation.into_inner(), + None => { + gum::debug!( + target: LOG_TARGET, + ?para_id, + "collator returned no collation on collate", + ); + return + }, + }; + + // Use the core_selector method from CandidateCommitments to extract CoreSelector + // and ClaimQueueOffset. + let mut commitments = CandidateCommitments::default(); + commitments.upward_messages = collation.upward_messages.clone(); + + let (mut core_selector, init_cq_offset) = match commitments.core_selector() { + Ok(Some((sel, off))) => (Some(sel), off), + Ok(None) => (None, ClaimQueueOffset(0)), + Err(err) => { + gum::debug!( + target: LOG_TARGET, + ?para_id, + "error processing UMP signals: {}", + err + ); + return + }, + }; + + // Identify the cores to build collations on using the given claim queue offset. + let cores_to_build_on = claim_queue + .iter_claims_at_depth(init_cq_offset.0 as usize) + .filter_map(|(core_idx, para_id)| { + (para_id == task_config.para_id).then_some(core_idx) + }) + .collect::>(); + + // Track used core selector indexes not to submit collations on the same core. + let mut used_core_selector_indexes = vec![false; cores_to_build_on.len()]; + let transposed_claim_queue = transpose_claim_queue(claim_queue.0); - let mut prev_core_selector_index = cores_to_build_on.len(); for core_index in &cores_to_build_on { - let collator_fn = match task_config.collator.as_ref() { - Some(x) => x, - None => return, - }; + let descriptor_core_index = match core_selector { + Some(ref core_selector) => { + // Calculate the core selector index and ensure it hasn't been used. + let core_selector_index = + core_selector.0 as usize % cores_to_build_on.len(); - let (collation, result_sender) = - match collator_fn(relay_parent, &validation_data).await { - Some(collation) => collation.into_inner(), - None => { + if used_core_selector_indexes[core_selector_index] { gum::debug!( target: LOG_TARGET, ?para_id, - "collator returned no collation on collate", + "parachain repeatedly selected the same core index", ); return - }, - }; - - // Determine the core index for the descriptor. - - // Use the core_selector method from CandidateCommitments to extract - // CoreSelector. - let mut commitments = CandidateCommitments::default(); - commitments.upward_messages = collation.upward_messages.clone(); - - let descriptor_core_index = match commitments.core_selector() { - // If a valid CoreSelector is found, calculate the core index based on it. - Ok(Some((core_selector, _cq_offset))) => { - let core_selector_index = - core_selector.0 as usize % cores_to_build_on.len(); - - if cores_to_build_on.len() > 1 && - prev_core_selector_index == core_selector_index - { - // Malicious behavior detected: The parachain is assigned to - // multiple cores but repeatedly selects the same core. This is - // unexpected and the process should stop here. - gum::error!(target: LOG_TARGET, "Parachain repeatedly selected the same core index"); - return; } + used_core_selector_indexes[core_selector_index] = true; - prev_core_selector_index = core_selector_index; - - let Some(commitments_core_index) = - cores_to_build_on.get(core_selector_index) - else { - // This should not happen, as the core_selector index is modulo the - // length of cores_to_build_on and we ensure that cores_to_build_on - // is not empty. - continue; - }; - - *commitments_core_index + let commitments_core_index = cores_to_build_on[core_selector_index]; + commitments_core_index }, // Fallback to the sequential core_index if no valid CoreSelector is found. - _ => *core_index, + None => *core_index, }; let parent_head = collation.head_data.clone(); @@ -433,9 +449,80 @@ impl CollationGenerationSubsystem { return } + // No need to build more collations if we have already built and submitted + // cores_to_build_on.len() collations. + if core_index == cores_to_build_on.last().unwrap() { + return + } + // Chain the collations. All else stays the same as we build the chained // collation on same relay parent. validation_data.parent_head = parent_head; + + // Prepare the next collation. + (collation, result_sender) = + match collator_fn(relay_parent, &validation_data).await { + Some(collation) => collation.into_inner(), + None => { + gum::debug!( + target: LOG_TARGET, + ?para_id, + "collator returned no collation on collate", + ); + return + }, + }; + + // Determine the new core selector and claim queue offset values. + commitments.upward_messages = collation.upward_messages.clone(); + let (new_core_selector, new_cq_offset) = match commitments.core_selector() { + Ok(Some((sel, off))) => (Some(sel), off), + Ok(None) => (None, ClaimQueueOffset(0)), + Err(err) => { + gum::debug!( + target: LOG_TARGET, + ?para_id, + "error processing UMP signals: {}", + err + ); + return + }, + }; + + // We generally assume that the claim queue offset will not change often, + // but if it does at any point, it is fine to skip that relay chain block + // and readjust the cores later. + if new_cq_offset.0 != init_cq_offset.0 { + gum::debug!( + target: LOG_TARGET, + ?para_id, + "claim queue offset changed between blocks (initial: {}, current: {})", + init_cq_offset.0, + new_cq_offset.0 + ); + return + } + + // We either use core selectors to choose cores for building blocks or iterate + // over all cores sequentially if no core selector is provided. A mixed approach + // is not acceptable. + if core_selector.is_none() && new_core_selector.is_some() { + gum::debug!( + target: LOG_TARGET, + ?para_id, + "previous block(s) had no core selector, but the current block has one", + ); + return + } else if core_selector.is_some() && new_core_selector.is_none() { + gum::debug!( + target: LOG_TARGET, + ?para_id, + "previous block(s) had a core selector, but the current block does not", + ); + return + } else { + core_selector = new_core_selector; + } } }), )?; From 81e9cb2c242495d4a1d931bcca7da7aa9c0b2eb9 Mon Sep 17 00:00:00 2001 From: Stephane Gurgenidze Date: Wed, 15 Jan 2025 15:41:44 +0400 Subject: [PATCH 03/11] refactor(collation-generation): simplify the implementation --- polkadot/node/collation-generation/src/lib.rs | 193 ++++++------------ 1 file changed, 59 insertions(+), 134 deletions(-) diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index f37b2e49221c..630926f3a312 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -61,7 +61,7 @@ use polkadot_primitives::{ }; use schnellru::{ByLength, LruMap}; use sp_core::crypto::Pair; -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; mod error; @@ -281,7 +281,7 @@ impl CollationGenerationSubsystem { .filter_map(|(core_idx, para_ids)| { para_ids.iter().any(|¶_id| para_id == config.para_id).then_some(*core_idx) }) - .collect::>(); + .collect::>(); // Nothing to do if no core is assigned to us at any depth. if assigned_cores.is_empty() { @@ -344,82 +344,78 @@ impl CollationGenerationSubsystem { ctx.spawn( "chained-collation-builder", Box::pin(async move { - // Build the first collation in advance to find the claim queue offset and retrieve - // the list of cores to submit collations on. - let collator_fn = match task_config.collator.as_ref() { - Some(x) => x, - None => return, - }; - - let (mut collation, mut result_sender) = - match collator_fn(relay_parent, &validation_data).await { - Some(collation) => collation.into_inner(), - None => { + let transposed_claim_queue = transpose_claim_queue(claim_queue.0.clone()); + + // Track used core indexes not to submit collations on the same core. + let mut used_cores = HashSet::new(); + + for i in 0..assigned_cores.len() { + // et the collation. + let collator_fn = match task_config.collator.as_ref() { + Some(x) => x, + None => return, + }; + + let (collation, result_sender) = + match collator_fn(relay_parent, &validation_data).await { + Some(collation) => collation.into_inner(), + None => { + gum::debug!( + target: LOG_TARGET, + ?para_id, + "collator returned no collation on collate", + ); + return + }, + }; + + // Use the core_selector method from CandidateCommitments to extract + // CoreSelector and ClaimQueueOffset. + let mut commitments = CandidateCommitments::default(); + commitments.upward_messages = collation.upward_messages.clone(); + + let (core_selector, cq_offset) = match commitments.core_selector() { + Ok(Some((sel, off))) => (Some(sel), off), + Ok(None) => (None, ClaimQueueOffset(0)), + Err(err) => { gum::debug!( target: LOG_TARGET, ?para_id, - "collator returned no collation on collate", + "error processing UMP signals: {}", + err ); return }, }; - // Use the core_selector method from CandidateCommitments to extract CoreSelector - // and ClaimQueueOffset. - let mut commitments = CandidateCommitments::default(); - commitments.upward_messages = collation.upward_messages.clone(); + // Identify the cores to build collations on using the given claim queue offset. + let cores_to_build_on = claim_queue + .iter_claims_at_depth(cq_offset.0 as usize) + .filter_map(|(core_idx, para_id)| { + (para_id == task_config.para_id).then_some(core_idx) + }) + .collect::>(); + + let index = match core_selector { + // Use the CoreSelector's index if provided. + Some(core_selector) => core_selector.0 as usize, + // Fallback to the sequential index if no CoreSelector is provided. + None => i, + }; + let descriptor_core_index = cores_to_build_on[index % cores_to_build_on.len()]; - let (mut core_selector, init_cq_offset) = match commitments.core_selector() { - Ok(Some((sel, off))) => (Some(sel), off), - Ok(None) => (None, ClaimQueueOffset(0)), - Err(err) => { + // Ensure the core index has not been used before. + if used_cores.contains(&descriptor_core_index.0) { gum::debug!( target: LOG_TARGET, ?para_id, - "error processing UMP signals: {}", - err + "parachain repeatedly selected the same core index", ); return - }, - }; - - // Identify the cores to build collations on using the given claim queue offset. - let cores_to_build_on = claim_queue - .iter_claims_at_depth(init_cq_offset.0 as usize) - .filter_map(|(core_idx, para_id)| { - (para_id == task_config.para_id).then_some(core_idx) - }) - .collect::>(); - - // Track used core selector indexes not to submit collations on the same core. - let mut used_core_selector_indexes = vec![false; cores_to_build_on.len()]; - - let transposed_claim_queue = transpose_claim_queue(claim_queue.0); - - for core_index in &cores_to_build_on { - let descriptor_core_index = match core_selector { - Some(ref core_selector) => { - // Calculate the core selector index and ensure it hasn't been used. - let core_selector_index = - core_selector.0 as usize % cores_to_build_on.len(); - - if used_core_selector_indexes[core_selector_index] { - gum::debug!( - target: LOG_TARGET, - ?para_id, - "parachain repeatedly selected the same core index", - ); - return - } - used_core_selector_indexes[core_selector_index] = true; - - let commitments_core_index = cores_to_build_on[core_selector_index]; - commitments_core_index - }, - // Fallback to the sequential core_index if no valid CoreSelector is found. - None => *core_index, - }; + } + used_cores.insert(descriptor_core_index.0); + // Distribute the collation. let parent_head = collation.head_data.clone(); if let Err(err) = construct_and_distribute_receipt( PreparedCollation { @@ -449,80 +445,9 @@ impl CollationGenerationSubsystem { return } - // No need to build more collations if we have already built and submitted - // cores_to_build_on.len() collations. - if core_index == cores_to_build_on.last().unwrap() { - return - } - // Chain the collations. All else stays the same as we build the chained // collation on same relay parent. validation_data.parent_head = parent_head; - - // Prepare the next collation. - (collation, result_sender) = - match collator_fn(relay_parent, &validation_data).await { - Some(collation) => collation.into_inner(), - None => { - gum::debug!( - target: LOG_TARGET, - ?para_id, - "collator returned no collation on collate", - ); - return - }, - }; - - // Determine the new core selector and claim queue offset values. - commitments.upward_messages = collation.upward_messages.clone(); - let (new_core_selector, new_cq_offset) = match commitments.core_selector() { - Ok(Some((sel, off))) => (Some(sel), off), - Ok(None) => (None, ClaimQueueOffset(0)), - Err(err) => { - gum::debug!( - target: LOG_TARGET, - ?para_id, - "error processing UMP signals: {}", - err - ); - return - }, - }; - - // We generally assume that the claim queue offset will not change often, - // but if it does at any point, it is fine to skip that relay chain block - // and readjust the cores later. - if new_cq_offset.0 != init_cq_offset.0 { - gum::debug!( - target: LOG_TARGET, - ?para_id, - "claim queue offset changed between blocks (initial: {}, current: {})", - init_cq_offset.0, - new_cq_offset.0 - ); - return - } - - // We either use core selectors to choose cores for building blocks or iterate - // over all cores sequentially if no core selector is provided. A mixed approach - // is not acceptable. - if core_selector.is_none() && new_core_selector.is_some() { - gum::debug!( - target: LOG_TARGET, - ?para_id, - "previous block(s) had no core selector, but the current block has one", - ); - return - } else if core_selector.is_some() && new_core_selector.is_none() { - gum::debug!( - target: LOG_TARGET, - ?para_id, - "previous block(s) had a core selector, but the current block does not", - ); - return - } else { - core_selector = new_core_selector; - } } }), )?; From 97fbb85cec9c567c2d0c08aa326c0e75e78ad904 Mon Sep 17 00:00:00 2001 From: Stephane Gurgenidze Date: Wed, 15 Jan 2025 15:45:20 +0400 Subject: [PATCH 04/11] fix(collation-generation): correct typo in comments --- polkadot/node/collation-generation/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index 630926f3a312..9f110df48002 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -350,7 +350,7 @@ impl CollationGenerationSubsystem { let mut used_cores = HashSet::new(); for i in 0..assigned_cores.len() { - // et the collation. + // Get the collation. let collator_fn = match task_config.collator.as_ref() { Some(x) => x, None => return, From 00bc30a811bb8e69fd770aa7b16609c3d85552e5 Mon Sep 17 00:00:00 2001 From: Stephane Gurgenidze Date: Wed, 15 Jan 2025 21:09:24 +0400 Subject: [PATCH 05/11] test(collation-generation): add unit tests --- polkadot/node/collation-generation/src/lib.rs | 10 ++ .../node/collation-generation/src/tests.rs | 153 +++++++++++++++--- 2 files changed, 144 insertions(+), 19 deletions(-) diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index 9f110df48002..4ed09ea330ca 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -396,6 +396,16 @@ impl CollationGenerationSubsystem { }) .collect::>(); + if cores_to_build_on.is_empty() { + gum::debug!( + target: LOG_TARGET, + ?para_id, + "no core is assigned to para at depth {}", + cq_offset.0, + ); + return + } + let index = match core_selector { // Use the CoreSelector's index if provided. Some(core_selector) => core_selector.0 as usize, diff --git a/polkadot/node/collation-generation/src/tests.rs b/polkadot/node/collation-generation/src/tests.rs index f81c14cdf8f9..646128e236b0 100644 --- a/polkadot/node/collation-generation/src/tests.rs +++ b/polkadot/node/collation-generation/src/tests.rs @@ -16,11 +16,10 @@ use super::*; use assert_matches::assert_matches; -use futures::{ - task::{Context as FuturesContext, Poll}, - Future, StreamExt, +use futures::{self, Future, StreamExt}; +use polkadot_node_primitives::{ + BlockData, Collation, CollationResult, CollatorFn, MaybeCompressedPoV, PoV, }; -use polkadot_node_primitives::{BlockData, Collation, CollationResult, MaybeCompressedPoV, PoV}; use polkadot_node_subsystem::{ messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest}, ActivatedLeaf, @@ -28,14 +27,16 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_test_helpers::TestSubsystemContextHandle; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_primitives::{ - node_features, vstaging::CandidateDescriptorVersion, CollatorPair, PersistedValidationData, + node_features, + vstaging::{CandidateDescriptorVersion, CoreSelector, UMPSignal, UMP_SEPARATOR}, + CollatorPair, PersistedValidationData, }; use polkadot_primitives_test_helpers::dummy_head_data; use rstest::rstest; use sp_keyring::sr25519::Keyring as Sr25519Keyring; use std::{ collections::{BTreeMap, VecDeque}, - pin::Pin, + sync::Mutex, }; type VirtualOverseer = TestSubsystemContextHandle; @@ -79,17 +80,49 @@ fn test_collation() -> Collation { } } -struct TestCollator; - -impl Future for TestCollator { - type Output = Option; +struct State { + core_selector_index: Option, + cq_offset: u8, +} - fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll { - Poll::Ready(Some(CollationResult { collation: test_collation(), result_sender: None })) +impl State { + fn new(core_selector_index: Option, cq_offset: u8) -> Self { + Self { core_selector_index, cq_offset } } } -impl Unpin for TestCollator {} +struct TestCollator { + state: Arc>, +} + +impl TestCollator { + fn new(core_selector_index: Option, cq_offset: u8) -> Self { + Self { state: Arc::new(Mutex::new(State::new(core_selector_index, cq_offset))) } + } + + pub fn create_collation_function(&self) -> CollatorFn { + let state = Arc::clone(&self.state); + + Box::new(move |_relay_parent: Hash, _validation_data: &PersistedValidationData| { + let mut state_guard = state.lock().unwrap(); + let mut collation = test_collation(); + + if let Some(index) = state_guard.core_selector_index { + collation.upward_messages.force_push(UMP_SEPARATOR); + collation.upward_messages.force_push( + UMPSignal::SelectCore( + CoreSelector(index), + ClaimQueueOffset(state_guard.cq_offset), + ) + .encode(), + ); + state_guard.core_selector_index = Some(index + 1); + } + + async move { Some(CollationResult { collation, result_sender: None }) }.boxed() + }) + } +} const TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000); @@ -101,10 +134,15 @@ async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages { .expect(&format!("{:?} is long enough to receive messages", TIMEOUT)) } -fn test_config>(para_id: Id) -> CollationGenerationConfig { +fn test_config>( + para_id: Id, + core_selector_index: Option, + cq_offset: u8, +) -> CollationGenerationConfig { + let test_collator = TestCollator::new(core_selector_index, cq_offset); CollationGenerationConfig { key: CollatorPair::generate().0, - collator: Some(Box::new(|_: Hash, _vd: &PersistedValidationData| TestCollator.boxed())), + collator: Some(test_collator.create_collation_function()), para_id: para_id.into(), } } @@ -219,7 +257,7 @@ fn distribute_collation_only_for_assigned_para_id_at_offset_0() { .collect::>(); test_harness(|mut virtual_overseer| async move { - helpers::initialize_collator(&mut virtual_overseer, para_id).await; + helpers::initialize_collator(&mut virtual_overseer, para_id, None, 0).await; helpers::activate_new_head(&mut virtual_overseer, activated_hash).await; helpers::handle_runtime_calls_on_new_head_activation( &mut virtual_overseer, @@ -259,7 +297,7 @@ fn distribute_collation_with_elastic_scaling(#[case] total_cores: u32) { .collect::>(); test_harness(|mut virtual_overseer| async move { - helpers::initialize_collator(&mut virtual_overseer, para_id).await; + helpers::initialize_collator(&mut virtual_overseer, para_id, None, 0).await; helpers::activate_new_head(&mut virtual_overseer, activated_hash).await; helpers::handle_runtime_calls_on_new_head_activation( &mut virtual_overseer, @@ -281,6 +319,74 @@ fn distribute_collation_with_elastic_scaling(#[case] total_cores: u32) { }); } +// Tests when submission core indexes need to be selected using the core selectors provided in the +// UMP signals. The core selector index is an increasing number that can start with a non-negative +// value (even greater than the core index), but the collation generation protocol uses the +// remainder to select the core. UMP signals may also contain a claim queue offset, based on which +// we need to select the assigned core indexes for the para from that offset in the claim queue. +#[rstest] +#[case(0, 0, 0)] +#[case(1, 0, 0)] +#[case(1, 5, 0)] +#[case(2, 0, 1)] +#[case(4, 2, 2)] +fn distribute_collation_with_core_selectors( + #[case] total_cores: u32, + // The core selector index that will be obtained from the first collation. + #[case] init_core_selector_index: u8, + // Claim queue offset where the assigned cores will be stored. + #[case] cq_offset: u8, +) { + let activated_hash: Hash = [1; 32].into(); + let para_id = ParaId::from(5); + let other_para_id = ParaId::from(10); + + let claim_queue = (0..total_cores) + .into_iter() + .map(|idx| { + // Set all cores assigned to para_id 5 at the cq_offset depth. + let mut vec = VecDeque::from(vec![other_para_id; cq_offset as usize]); + vec.push_back(para_id); + (CoreIndex(idx), vec) + }) + .collect::>(); + + test_harness(|mut virtual_overseer| async move { + helpers::initialize_collator( + &mut virtual_overseer, + para_id, + Some(init_core_selector_index), + cq_offset, + ) + .await; + helpers::activate_new_head(&mut virtual_overseer, activated_hash).await; + helpers::handle_runtime_calls_on_new_head_activation( + &mut virtual_overseer, + activated_hash, + claim_queue, + NodeFeatures::EMPTY, + ) + .await; + + let mut cores_assigned = (0..total_cores).collect::>(); + if total_cores > 1 && init_core_selector_index > 0 { + // We need to rotate the list of cores because the first core selector index was + // non-zero, which should change the sequence of submissions. However, collations should + // still be submitted on all cores. + cores_assigned.rotate_left((init_core_selector_index as u32 % total_cores) as usize); + } + helpers::handle_cores_processing_for_a_leaf( + &mut virtual_overseer, + activated_hash, + para_id, + cores_assigned, + ) + .await; + + virtual_overseer + }); +} + #[rstest] #[case(true)] #[case(false)] @@ -405,10 +511,19 @@ mod helpers { use std::collections::{BTreeMap, VecDeque}; // Sends `Initialize` with a collator config - pub async fn initialize_collator(virtual_overseer: &mut VirtualOverseer, para_id: ParaId) { + pub async fn initialize_collator( + virtual_overseer: &mut VirtualOverseer, + para_id: ParaId, + core_selector_index: Option, + cq_offset: u8, + ) { virtual_overseer .send(FromOrchestra::Communication { - msg: CollationGenerationMessage::Initialize(test_config(para_id)), + msg: CollationGenerationMessage::Initialize(test_config( + para_id, + core_selector_index, + cq_offset, + )), }) .await; } From 2ae3f4925db6ea66db088a7e7b0ad9295520f0e6 Mon Sep 17 00:00:00 2001 From: Stephane Gurgenidze Date: Wed, 15 Jan 2025 21:28:19 +0400 Subject: [PATCH 06/11] docs(prdoc): add prdoc for PR #7104 --- prdoc/pr_7104.prdoc | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 prdoc/pr_7104.prdoc diff --git a/prdoc/pr_7104.prdoc b/prdoc/pr_7104.prdoc new file mode 100644 index 000000000000..ea966ed308e0 --- /dev/null +++ b/prdoc/pr_7104.prdoc @@ -0,0 +1,20 @@ +title: "collation-generation: resolve mismatch between descriptor and commitments core index" + +doc: + - audience: Node Dev + description: | + This PR resolves a bug where normal collators failed to generate and submit collations, + resulting in the following error: + + ``` + ERROR tokio-runtime-worker parachain::collation-generation: Failed to construct and + distribute collation: V2 core index check failed: The core index in commitments doesn't + match the one in descriptor. + ``` + + When core selectors are provided in the UMP signals, core indexes will be chosen using them. + The fix ensures that functionality remains unchanged for parachains not using UMP signals. + +crates: + - name: polkadot-node-collation-generation + bump: patch From 1761bfc0d4407d748cdcddd78d493b1061fe37fd Mon Sep 17 00:00:00 2001 From: Stephane Gurgenidze Date: Mon, 20 Jan 2025 16:15:40 +0400 Subject: [PATCH 07/11] refactor(collation-generation): address feedback from PR comments --- polkadot/node/collation-generation/src/lib.rs | 12 +- .../node/collation-generation/src/tests.rs | 127 +++++++++++++----- 2 files changed, 106 insertions(+), 33 deletions(-) diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index 4ed09ea330ca..a526e679f919 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -281,7 +281,7 @@ impl CollationGenerationSubsystem { .filter_map(|(core_idx, para_ids)| { para_ids.iter().any(|¶_id| para_id == config.para_id).then_some(*core_idx) }) - .collect::>(); + .collect::>(); // Nothing to do if no core is assigned to us at any depth. if assigned_cores.is_empty() { @@ -419,11 +419,19 @@ impl CollationGenerationSubsystem { gum::debug!( target: LOG_TARGET, ?para_id, - "parachain repeatedly selected the same core index", + "parachain repeatedly selected the same core index: {}", + descriptor_core_index.0, ); return } + used_cores.insert(descriptor_core_index.0); + gum::trace!( + target: LOG_TARGET, + ?para_id, + "selected core index: {}", + descriptor_core_index.0, + ); // Distribute the collation. let parent_head = collation.head_data.clone(); diff --git a/polkadot/node/collation-generation/src/tests.rs b/polkadot/node/collation-generation/src/tests.rs index 646128e236b0..fafbd1a33653 100644 --- a/polkadot/node/collation-generation/src/tests.rs +++ b/polkadot/node/collation-generation/src/tests.rs @@ -80,14 +80,29 @@ fn test_collation() -> Collation { } } -struct State { - core_selector_index: Option, +struct CoreSelectorData { + // The core selector index. + cs_index: u8, + // The increment value for the core selector index. Normally 1, but can be set to 0 or another + // value for testing scenarios where a parachain repeatedly selects the same core index. + increment_cs_index_by: u8, + // The claim queue offset. cq_offset: u8, } +impl CoreSelectorData { + fn new(cs_index: u8, increment_cs_index_by: u8, cq_offset: u8) -> Self { + Self { cs_index, increment_cs_index_by, cq_offset } + } +} + +struct State { + core_selector_data: Option, +} + impl State { - fn new(core_selector_index: Option, cq_offset: u8) -> Self { - Self { core_selector_index, cq_offset } + fn new(core_selector_data: Option) -> Self { + Self { core_selector_data } } } @@ -96,27 +111,27 @@ struct TestCollator { } impl TestCollator { - fn new(core_selector_index: Option, cq_offset: u8) -> Self { - Self { state: Arc::new(Mutex::new(State::new(core_selector_index, cq_offset))) } + fn new(core_selector_data: Option) -> Self { + Self { state: Arc::new(Mutex::new(State::new(core_selector_data))) } } pub fn create_collation_function(&self) -> CollatorFn { let state = Arc::clone(&self.state); Box::new(move |_relay_parent: Hash, _validation_data: &PersistedValidationData| { - let mut state_guard = state.lock().unwrap(); let mut collation = test_collation(); + let mut state_guard = state.lock().unwrap(); - if let Some(index) = state_guard.core_selector_index { + if let Some(core_selector_data) = &mut state_guard.core_selector_data { collation.upward_messages.force_push(UMP_SEPARATOR); collation.upward_messages.force_push( UMPSignal::SelectCore( - CoreSelector(index), - ClaimQueueOffset(state_guard.cq_offset), + CoreSelector(core_selector_data.cs_index), + ClaimQueueOffset(core_selector_data.cq_offset), ) .encode(), ); - state_guard.core_selector_index = Some(index + 1); + core_selector_data.cs_index += core_selector_data.increment_cs_index_by; } async move { Some(CollationResult { collation, result_sender: None }) }.boxed() @@ -136,10 +151,9 @@ async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages { fn test_config>( para_id: Id, - core_selector_index: Option, - cq_offset: u8, + core_selector_data: Option, ) -> CollationGenerationConfig { - let test_collator = TestCollator::new(core_selector_index, cq_offset); + let test_collator = TestCollator::new(core_selector_data); CollationGenerationConfig { key: CollatorPair::generate().0, collator: Some(test_collator.create_collation_function()), @@ -257,7 +271,7 @@ fn distribute_collation_only_for_assigned_para_id_at_offset_0() { .collect::>(); test_harness(|mut virtual_overseer| async move { - helpers::initialize_collator(&mut virtual_overseer, para_id, None, 0).await; + helpers::initialize_collator(&mut virtual_overseer, para_id, None).await; helpers::activate_new_head(&mut virtual_overseer, activated_hash).await; helpers::handle_runtime_calls_on_new_head_activation( &mut virtual_overseer, @@ -297,7 +311,7 @@ fn distribute_collation_with_elastic_scaling(#[case] total_cores: u32) { .collect::>(); test_harness(|mut virtual_overseer| async move { - helpers::initialize_collator(&mut virtual_overseer, para_id, None, 0).await; + helpers::initialize_collator(&mut virtual_overseer, para_id, None).await; helpers::activate_new_head(&mut virtual_overseer, activated_hash).await; helpers::handle_runtime_calls_on_new_head_activation( &mut virtual_overseer, @@ -325,21 +339,25 @@ fn distribute_collation_with_elastic_scaling(#[case] total_cores: u32) { // remainder to select the core. UMP signals may also contain a claim queue offset, based on which // we need to select the assigned core indexes for the para from that offset in the claim queue. #[rstest] -#[case(0, 0, 0)] -#[case(1, 0, 0)] -#[case(1, 5, 0)] -#[case(2, 0, 1)] -#[case(4, 2, 2)] +#[case(0, 0, 0, false)] +#[case(1, 0, 0, true)] +#[case(1, 5, 0, false)] +#[case(2, 0, 1, true)] +#[case(4, 2, 2, false)] fn distribute_collation_with_core_selectors( #[case] total_cores: u32, // The core selector index that will be obtained from the first collation. - #[case] init_core_selector_index: u8, + #[case] init_cs_index: u8, // Claim queue offset where the assigned cores will be stored. #[case] cq_offset: u8, + // Enables v2 receipts feature, affecting core selector and claim queue handling. + #[case] v2_receipts: bool, ) { let activated_hash: Hash = [1; 32].into(); let para_id = ParaId::from(5); let other_para_id = ParaId::from(10); + let node_features = + if v2_receipts { node_features_with_v2_enabled() } else { NodeFeatures::EMPTY }; let claim_queue = (0..total_cores) .into_iter() @@ -355,8 +373,7 @@ fn distribute_collation_with_core_selectors( helpers::initialize_collator( &mut virtual_overseer, para_id, - Some(init_core_selector_index), - cq_offset, + Some(CoreSelectorData::new(init_cs_index, 1, cq_offset)), ) .await; helpers::activate_new_head(&mut virtual_overseer, activated_hash).await; @@ -364,16 +381,16 @@ fn distribute_collation_with_core_selectors( &mut virtual_overseer, activated_hash, claim_queue, - NodeFeatures::EMPTY, + node_features, ) .await; let mut cores_assigned = (0..total_cores).collect::>(); - if total_cores > 1 && init_core_selector_index > 0 { + if total_cores > 1 && init_cs_index > 0 { // We need to rotate the list of cores because the first core selector index was // non-zero, which should change the sequence of submissions. However, collations should // still be submitted on all cores. - cores_assigned.rotate_left((init_core_selector_index as u32 % total_cores) as usize); + cores_assigned.rotate_left((init_cs_index as u32 % total_cores) as usize); } helpers::handle_cores_processing_for_a_leaf( &mut virtual_overseer, @@ -387,6 +404,56 @@ fn distribute_collation_with_core_selectors( }); } +// Tests the behavior when a parachain repeatedly selects the same core index. +// Ensures that the system handles this behavior correctly while maintaining expected functionality. +#[rstest] +#[case(3, 0, vec![0])] +#[case(3, 1, vec![0, 1, 2])] +#[case(3, 2, vec![0, 2, 1])] +#[case(3, 3, vec![0])] +#[case(3, 4, vec![0, 1, 2])] +fn distribute_collation_with_repeated_core_selector_index( + #[case] total_cores: u32, + #[case] increment_cs_index_by: u8, + #[case] expected_selected_cores: Vec, +) { + let activated_hash: Hash = [1; 32].into(); + let para_id = ParaId::from(5); + let node_features = node_features_with_v2_enabled(); + + let claim_queue = (0..total_cores) + .into_iter() + .map(|idx| (CoreIndex(idx), VecDeque::from([para_id]))) + .collect::>(); + + test_harness(|mut virtual_overseer| async move { + helpers::initialize_collator( + &mut virtual_overseer, + para_id, + Some(CoreSelectorData::new(0, increment_cs_index_by, 0)), + ) + .await; + helpers::activate_new_head(&mut virtual_overseer, activated_hash).await; + helpers::handle_runtime_calls_on_new_head_activation( + &mut virtual_overseer, + activated_hash, + claim_queue, + node_features, + ) + .await; + + helpers::handle_cores_processing_for_a_leaf( + &mut virtual_overseer, + activated_hash, + para_id, + expected_selected_cores, + ) + .await; + + virtual_overseer + }); +} + #[rstest] #[case(true)] #[case(false)] @@ -514,15 +581,13 @@ mod helpers { pub async fn initialize_collator( virtual_overseer: &mut VirtualOverseer, para_id: ParaId, - core_selector_index: Option, - cq_offset: u8, + core_selector_data: Option, ) { virtual_overseer .send(FromOrchestra::Communication { msg: CollationGenerationMessage::Initialize(test_config( para_id, - core_selector_index, - cq_offset, + core_selector_data, )), }) .await; From 09797f9f8d28c1ef2570d12c1a8522ef9eb052da Mon Sep 17 00:00:00 2001 From: Stephane Gurgenidze Date: Mon, 20 Jan 2025 16:17:59 +0400 Subject: [PATCH 08/11] docs(prdoc): update prdoc for PR #7104 --- prdoc/pr_7104.prdoc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/prdoc/pr_7104.prdoc b/prdoc/pr_7104.prdoc index ea966ed308e0..d7dd29b19afe 100644 --- a/prdoc/pr_7104.prdoc +++ b/prdoc/pr_7104.prdoc @@ -3,7 +3,7 @@ title: "collation-generation: resolve mismatch between descriptor and commitment doc: - audience: Node Dev description: | - This PR resolves a bug where normal collators failed to generate and submit collations, + This PR resolves a bug where collators failed to generate and submit collations, resulting in the following error: ``` @@ -12,8 +12,8 @@ doc: match the one in descriptor. ``` - When core selectors are provided in the UMP signals, core indexes will be chosen using them. - The fix ensures that functionality remains unchanged for parachains not using UMP signals. + This fix ensures the descriptor core index contains the value determined by the core + selector UMP signal when the parachain is using RFC103. crates: - name: polkadot-node-collation-generation From e1902e5d5b18837a0b13229ffac49850419dc5ca Mon Sep 17 00:00:00 2001 From: Stephane Gurgenidze Date: Tue, 21 Jan 2025 15:53:54 +0400 Subject: [PATCH 09/11] refactor(collation-generation): address feedback from PR comments --- polkadot/node/collation-generation/src/lib.rs | 2 +- prdoc/pr_7104.prdoc | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index a526e679f919..d290b8c8774e 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -416,7 +416,7 @@ impl CollationGenerationSubsystem { // Ensure the core index has not been used before. if used_cores.contains(&descriptor_core_index.0) { - gum::debug!( + gum::warn!( target: LOG_TARGET, ?para_id, "parachain repeatedly selected the same core index: {}", diff --git a/prdoc/pr_7104.prdoc b/prdoc/pr_7104.prdoc index d7dd29b19afe..bd05e2b60e1f 100644 --- a/prdoc/pr_7104.prdoc +++ b/prdoc/pr_7104.prdoc @@ -12,6 +12,9 @@ doc: match the one in descriptor. ``` + This issue affects only legacy and test collators that still use the collation function. + It is not a problem for lookahead or slot-based collators. + This fix ensures the descriptor core index contains the value determined by the core selector UMP signal when the parachain is using RFC103. From 3612d9b480cc69c0c6cd273a4f7f9e7f0eb5738d Mon Sep 17 00:00:00 2001 From: Stephane Gurgenidze Date: Tue, 21 Jan 2025 17:10:17 +0400 Subject: [PATCH 10/11] refactor(collation-generation): remove redundant match --- polkadot/node/collation-generation/src/lib.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index d290b8c8774e..3c8a216f5f35 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -374,9 +374,11 @@ impl CollationGenerationSubsystem { let mut commitments = CandidateCommitments::default(); commitments.upward_messages = collation.upward_messages.clone(); - let (core_selector, cq_offset) = match commitments.core_selector() { - Ok(Some((sel, off))) => (Some(sel), off), - Ok(None) => (None, ClaimQueueOffset(0)), + let (cs_index, cq_offset) = match commitments.core_selector() { + // Use the CoreSelector's index if provided. + Ok(Some((sel, off))) => (sel.0 as usize, off), + // Fallback to the sequential index if no CoreSelector is provided. + Ok(None) => (i, ClaimQueueOffset(0)), Err(err) => { gum::debug!( target: LOG_TARGET, @@ -406,13 +408,8 @@ impl CollationGenerationSubsystem { return } - let index = match core_selector { - // Use the CoreSelector's index if provided. - Some(core_selector) => core_selector.0 as usize, - // Fallback to the sequential index if no CoreSelector is provided. - None => i, - }; - let descriptor_core_index = cores_to_build_on[index % cores_to_build_on.len()]; + let descriptor_core_index = + cores_to_build_on[cs_index % cores_to_build_on.len()]; // Ensure the core index has not been used before. if used_cores.contains(&descriptor_core_index.0) { From 018fbf07696f0e439405edb98a77dde30ed217e9 Mon Sep 17 00:00:00 2001 From: Stephane Gurgenidze Date: Tue, 21 Jan 2025 17:18:00 +0400 Subject: [PATCH 11/11] refactor(collation-generation): update naming for CoreSelectorData attributes --- polkadot/node/collation-generation/src/tests.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/polkadot/node/collation-generation/src/tests.rs b/polkadot/node/collation-generation/src/tests.rs index fafbd1a33653..dc1d7b3489c1 100644 --- a/polkadot/node/collation-generation/src/tests.rs +++ b/polkadot/node/collation-generation/src/tests.rs @@ -82,17 +82,17 @@ fn test_collation() -> Collation { struct CoreSelectorData { // The core selector index. - cs_index: u8, + index: u8, // The increment value for the core selector index. Normally 1, but can be set to 0 or another // value for testing scenarios where a parachain repeatedly selects the same core index. - increment_cs_index_by: u8, + increment_index_by: u8, // The claim queue offset. cq_offset: u8, } impl CoreSelectorData { - fn new(cs_index: u8, increment_cs_index_by: u8, cq_offset: u8) -> Self { - Self { cs_index, increment_cs_index_by, cq_offset } + fn new(index: u8, increment_index_by: u8, cq_offset: u8) -> Self { + Self { index, increment_index_by, cq_offset } } } @@ -126,12 +126,12 @@ impl TestCollator { collation.upward_messages.force_push(UMP_SEPARATOR); collation.upward_messages.force_push( UMPSignal::SelectCore( - CoreSelector(core_selector_data.cs_index), + CoreSelector(core_selector_data.index), ClaimQueueOffset(core_selector_data.cq_offset), ) .encode(), ); - core_selector_data.cs_index += core_selector_data.increment_cs_index_by; + core_selector_data.index += core_selector_data.increment_index_by; } async move { Some(CollationResult { collation, result_sender: None }) }.boxed()