diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index b371017a8289..3c8a216f5f35 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -53,7 +53,7 @@ use polkadot_primitives::{ node_features::FeatureIndex, vstaging::{ transpose_claim_queue, CandidateDescriptorV2, CandidateReceiptV2 as CandidateReceipt, - CommittedCandidateReceiptV2, TransposedClaimQueue, + ClaimQueueOffset, CommittedCandidateReceiptV2, TransposedClaimQueue, }, CandidateCommitments, CandidateDescriptor, CollatorPair, CoreIndex, Hash, Id as ParaId, NodeFeatures, OccupiedCoreAssumption, PersistedValidationData, SessionIndex, @@ -61,7 +61,7 @@ use polkadot_primitives::{ }; use schnellru::{ByLength, LruMap}; use sp_core::crypto::Pair; -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; mod error; @@ -276,13 +276,15 @@ impl CollationGenerationSubsystem { let claim_queue = ClaimQueueSnapshot::from(request_claim_queue(relay_parent, ctx.sender()).await.await??); - let cores_to_build_on = claim_queue - .iter_claims_at_depth(0) - .filter_map(|(core_idx, para_id)| (para_id == config.para_id).then_some(core_idx)) + let assigned_cores = claim_queue + .iter_all_claims() + .filter_map(|(core_idx, para_ids)| { + para_ids.iter().any(|¶_id| para_id == config.para_id).then_some(*core_idx) + }) .collect::>(); - // Nothing to do if no core assigned to us. - if cores_to_build_on.is_empty() { + // Nothing to do if no core is assigned to us at any depth. + if assigned_cores.is_empty() { return Ok(()) } @@ -342,9 +344,13 @@ impl CollationGenerationSubsystem { ctx.spawn( "chained-collation-builder", Box::pin(async move { - let transposed_claim_queue = transpose_claim_queue(claim_queue.0); + let transposed_claim_queue = transpose_claim_queue(claim_queue.0.clone()); - for core_index in cores_to_build_on { + // Track used core indexes not to submit collations on the same core. + let mut used_cores = HashSet::new(); + + for i in 0..assigned_cores.len() { + // Get the collation. let collator_fn = match task_config.collator.as_ref() { Some(x) => x, None => return, @@ -363,6 +369,68 @@ impl CollationGenerationSubsystem { }, }; + // Use the core_selector method from CandidateCommitments to extract + // CoreSelector and ClaimQueueOffset. + let mut commitments = CandidateCommitments::default(); + commitments.upward_messages = collation.upward_messages.clone(); + + let (cs_index, cq_offset) = match commitments.core_selector() { + // Use the CoreSelector's index if provided. + Ok(Some((sel, off))) => (sel.0 as usize, off), + // Fallback to the sequential index if no CoreSelector is provided. + Ok(None) => (i, ClaimQueueOffset(0)), + Err(err) => { + gum::debug!( + target: LOG_TARGET, + ?para_id, + "error processing UMP signals: {}", + err + ); + return + }, + }; + + // Identify the cores to build collations on using the given claim queue offset. + let cores_to_build_on = claim_queue + .iter_claims_at_depth(cq_offset.0 as usize) + .filter_map(|(core_idx, para_id)| { + (para_id == task_config.para_id).then_some(core_idx) + }) + .collect::>(); + + if cores_to_build_on.is_empty() { + gum::debug!( + target: LOG_TARGET, + ?para_id, + "no core is assigned to para at depth {}", + cq_offset.0, + ); + return + } + + let descriptor_core_index = + cores_to_build_on[cs_index % cores_to_build_on.len()]; + + // Ensure the core index has not been used before. + if used_cores.contains(&descriptor_core_index.0) { + gum::warn!( + target: LOG_TARGET, + ?para_id, + "parachain repeatedly selected the same core index: {}", + descriptor_core_index.0, + ); + return + } + + used_cores.insert(descriptor_core_index.0); + gum::trace!( + target: LOG_TARGET, + ?para_id, + "selected core index: {}", + descriptor_core_index.0, + ); + + // Distribute the collation. let parent_head = collation.head_data.clone(); if let Err(err) = construct_and_distribute_receipt( PreparedCollation { @@ -372,7 +440,7 @@ impl CollationGenerationSubsystem { validation_data: validation_data.clone(), validation_code_hash, n_validators, - core_index, + core_index: descriptor_core_index, session_index, }, task_config.key.clone(), diff --git a/polkadot/node/collation-generation/src/tests.rs b/polkadot/node/collation-generation/src/tests.rs index f81c14cdf8f9..dc1d7b3489c1 100644 --- a/polkadot/node/collation-generation/src/tests.rs +++ b/polkadot/node/collation-generation/src/tests.rs @@ -16,11 +16,10 @@ use super::*; use assert_matches::assert_matches; -use futures::{ - task::{Context as FuturesContext, Poll}, - Future, StreamExt, +use futures::{self, Future, StreamExt}; +use polkadot_node_primitives::{ + BlockData, Collation, CollationResult, CollatorFn, MaybeCompressedPoV, PoV, }; -use polkadot_node_primitives::{BlockData, Collation, CollationResult, MaybeCompressedPoV, PoV}; use polkadot_node_subsystem::{ messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest}, ActivatedLeaf, @@ -28,14 +27,16 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_test_helpers::TestSubsystemContextHandle; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_primitives::{ - node_features, vstaging::CandidateDescriptorVersion, CollatorPair, PersistedValidationData, + node_features, + vstaging::{CandidateDescriptorVersion, CoreSelector, UMPSignal, UMP_SEPARATOR}, + CollatorPair, PersistedValidationData, }; use polkadot_primitives_test_helpers::dummy_head_data; use rstest::rstest; use sp_keyring::sr25519::Keyring as Sr25519Keyring; use std::{ collections::{BTreeMap, VecDeque}, - pin::Pin, + sync::Mutex, }; type VirtualOverseer = TestSubsystemContextHandle; @@ -79,17 +80,64 @@ fn test_collation() -> Collation { } } -struct TestCollator; +struct CoreSelectorData { + // The core selector index. + index: u8, + // The increment value for the core selector index. Normally 1, but can be set to 0 or another + // value for testing scenarios where a parachain repeatedly selects the same core index. + increment_index_by: u8, + // The claim queue offset. + cq_offset: u8, +} + +impl CoreSelectorData { + fn new(index: u8, increment_index_by: u8, cq_offset: u8) -> Self { + Self { index, increment_index_by, cq_offset } + } +} -impl Future for TestCollator { - type Output = Option; +struct State { + core_selector_data: Option, +} - fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll { - Poll::Ready(Some(CollationResult { collation: test_collation(), result_sender: None })) +impl State { + fn new(core_selector_data: Option) -> Self { + Self { core_selector_data } } } -impl Unpin for TestCollator {} +struct TestCollator { + state: Arc>, +} + +impl TestCollator { + fn new(core_selector_data: Option) -> Self { + Self { state: Arc::new(Mutex::new(State::new(core_selector_data))) } + } + + pub fn create_collation_function(&self) -> CollatorFn { + let state = Arc::clone(&self.state); + + Box::new(move |_relay_parent: Hash, _validation_data: &PersistedValidationData| { + let mut collation = test_collation(); + let mut state_guard = state.lock().unwrap(); + + if let Some(core_selector_data) = &mut state_guard.core_selector_data { + collation.upward_messages.force_push(UMP_SEPARATOR); + collation.upward_messages.force_push( + UMPSignal::SelectCore( + CoreSelector(core_selector_data.index), + ClaimQueueOffset(core_selector_data.cq_offset), + ) + .encode(), + ); + core_selector_data.index += core_selector_data.increment_index_by; + } + + async move { Some(CollationResult { collation, result_sender: None }) }.boxed() + }) + } +} const TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000); @@ -101,10 +149,14 @@ async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages { .expect(&format!("{:?} is long enough to receive messages", TIMEOUT)) } -fn test_config>(para_id: Id) -> CollationGenerationConfig { +fn test_config>( + para_id: Id, + core_selector_data: Option, +) -> CollationGenerationConfig { + let test_collator = TestCollator::new(core_selector_data); CollationGenerationConfig { key: CollatorPair::generate().0, - collator: Some(Box::new(|_: Hash, _vd: &PersistedValidationData| TestCollator.boxed())), + collator: Some(test_collator.create_collation_function()), para_id: para_id.into(), } } @@ -219,7 +271,7 @@ fn distribute_collation_only_for_assigned_para_id_at_offset_0() { .collect::>(); test_harness(|mut virtual_overseer| async move { - helpers::initialize_collator(&mut virtual_overseer, para_id).await; + helpers::initialize_collator(&mut virtual_overseer, para_id, None).await; helpers::activate_new_head(&mut virtual_overseer, activated_hash).await; helpers::handle_runtime_calls_on_new_head_activation( &mut virtual_overseer, @@ -259,7 +311,7 @@ fn distribute_collation_with_elastic_scaling(#[case] total_cores: u32) { .collect::>(); test_harness(|mut virtual_overseer| async move { - helpers::initialize_collator(&mut virtual_overseer, para_id).await; + helpers::initialize_collator(&mut virtual_overseer, para_id, None).await; helpers::activate_new_head(&mut virtual_overseer, activated_hash).await; helpers::handle_runtime_calls_on_new_head_activation( &mut virtual_overseer, @@ -281,6 +333,127 @@ fn distribute_collation_with_elastic_scaling(#[case] total_cores: u32) { }); } +// Tests when submission core indexes need to be selected using the core selectors provided in the +// UMP signals. The core selector index is an increasing number that can start with a non-negative +// value (even greater than the core index), but the collation generation protocol uses the +// remainder to select the core. UMP signals may also contain a claim queue offset, based on which +// we need to select the assigned core indexes for the para from that offset in the claim queue. +#[rstest] +#[case(0, 0, 0, false)] +#[case(1, 0, 0, true)] +#[case(1, 5, 0, false)] +#[case(2, 0, 1, true)] +#[case(4, 2, 2, false)] +fn distribute_collation_with_core_selectors( + #[case] total_cores: u32, + // The core selector index that will be obtained from the first collation. + #[case] init_cs_index: u8, + // Claim queue offset where the assigned cores will be stored. + #[case] cq_offset: u8, + // Enables v2 receipts feature, affecting core selector and claim queue handling. + #[case] v2_receipts: bool, +) { + let activated_hash: Hash = [1; 32].into(); + let para_id = ParaId::from(5); + let other_para_id = ParaId::from(10); + let node_features = + if v2_receipts { node_features_with_v2_enabled() } else { NodeFeatures::EMPTY }; + + let claim_queue = (0..total_cores) + .into_iter() + .map(|idx| { + // Set all cores assigned to para_id 5 at the cq_offset depth. + let mut vec = VecDeque::from(vec![other_para_id; cq_offset as usize]); + vec.push_back(para_id); + (CoreIndex(idx), vec) + }) + .collect::>(); + + test_harness(|mut virtual_overseer| async move { + helpers::initialize_collator( + &mut virtual_overseer, + para_id, + Some(CoreSelectorData::new(init_cs_index, 1, cq_offset)), + ) + .await; + helpers::activate_new_head(&mut virtual_overseer, activated_hash).await; + helpers::handle_runtime_calls_on_new_head_activation( + &mut virtual_overseer, + activated_hash, + claim_queue, + node_features, + ) + .await; + + let mut cores_assigned = (0..total_cores).collect::>(); + if total_cores > 1 && init_cs_index > 0 { + // We need to rotate the list of cores because the first core selector index was + // non-zero, which should change the sequence of submissions. However, collations should + // still be submitted on all cores. + cores_assigned.rotate_left((init_cs_index as u32 % total_cores) as usize); + } + helpers::handle_cores_processing_for_a_leaf( + &mut virtual_overseer, + activated_hash, + para_id, + cores_assigned, + ) + .await; + + virtual_overseer + }); +} + +// Tests the behavior when a parachain repeatedly selects the same core index. +// Ensures that the system handles this behavior correctly while maintaining expected functionality. +#[rstest] +#[case(3, 0, vec![0])] +#[case(3, 1, vec![0, 1, 2])] +#[case(3, 2, vec![0, 2, 1])] +#[case(3, 3, vec![0])] +#[case(3, 4, vec![0, 1, 2])] +fn distribute_collation_with_repeated_core_selector_index( + #[case] total_cores: u32, + #[case] increment_cs_index_by: u8, + #[case] expected_selected_cores: Vec, +) { + let activated_hash: Hash = [1; 32].into(); + let para_id = ParaId::from(5); + let node_features = node_features_with_v2_enabled(); + + let claim_queue = (0..total_cores) + .into_iter() + .map(|idx| (CoreIndex(idx), VecDeque::from([para_id]))) + .collect::>(); + + test_harness(|mut virtual_overseer| async move { + helpers::initialize_collator( + &mut virtual_overseer, + para_id, + Some(CoreSelectorData::new(0, increment_cs_index_by, 0)), + ) + .await; + helpers::activate_new_head(&mut virtual_overseer, activated_hash).await; + helpers::handle_runtime_calls_on_new_head_activation( + &mut virtual_overseer, + activated_hash, + claim_queue, + node_features, + ) + .await; + + helpers::handle_cores_processing_for_a_leaf( + &mut virtual_overseer, + activated_hash, + para_id, + expected_selected_cores, + ) + .await; + + virtual_overseer + }); +} + #[rstest] #[case(true)] #[case(false)] @@ -405,10 +578,17 @@ mod helpers { use std::collections::{BTreeMap, VecDeque}; // Sends `Initialize` with a collator config - pub async fn initialize_collator(virtual_overseer: &mut VirtualOverseer, para_id: ParaId) { + pub async fn initialize_collator( + virtual_overseer: &mut VirtualOverseer, + para_id: ParaId, + core_selector_data: Option, + ) { virtual_overseer .send(FromOrchestra::Communication { - msg: CollationGenerationMessage::Initialize(test_config(para_id)), + msg: CollationGenerationMessage::Initialize(test_config( + para_id, + core_selector_data, + )), }) .await; } diff --git a/prdoc/pr_7104.prdoc b/prdoc/pr_7104.prdoc new file mode 100644 index 000000000000..bd05e2b60e1f --- /dev/null +++ b/prdoc/pr_7104.prdoc @@ -0,0 +1,23 @@ +title: "collation-generation: resolve mismatch between descriptor and commitments core index" + +doc: + - audience: Node Dev + description: | + This PR resolves a bug where collators failed to generate and submit collations, + resulting in the following error: + + ``` + ERROR tokio-runtime-worker parachain::collation-generation: Failed to construct and + distribute collation: V2 core index check failed: The core index in commitments doesn't + match the one in descriptor. + ``` + + This issue affects only legacy and test collators that still use the collation function. + It is not a problem for lookahead or slot-based collators. + + This fix ensures the descriptor core index contains the value determined by the core + selector UMP signal when the parachain is using RFC103. + +crates: + - name: polkadot-node-collation-generation + bump: patch