Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

collation-generation: resolve mismatch between descriptor and commitments core index #7104

Merged
merged 14 commits into from
Jan 22, 2025
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 147 additions & 19 deletions polkadot/node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use polkadot_primitives::{
node_features::FeatureIndex,
vstaging::{
transpose_claim_queue, CandidateDescriptorV2, CandidateReceiptV2 as CandidateReceipt,
CommittedCandidateReceiptV2, TransposedClaimQueue,
ClaimQueueOffset, CommittedCandidateReceiptV2, TransposedClaimQueue,
},
CandidateCommitments, CandidateDescriptor, CollatorPair, CoreIndex, Hash, Id as ParaId,
NodeFeatures, OccupiedCoreAssumption, PersistedValidationData, SessionIndex,
Expand Down Expand Up @@ -276,13 +276,15 @@ impl CollationGenerationSubsystem {
let claim_queue =
ClaimQueueSnapshot::from(request_claim_queue(relay_parent, ctx.sender()).await.await??);

let cores_to_build_on = claim_queue
.iter_claims_at_depth(0)
.filter_map(|(core_idx, para_id)| (para_id == config.para_id).then_some(core_idx))
let assigned_cores = claim_queue
.iter_all_claims()
.filter_map(|(core_idx, para_ids)| {
para_ids.iter().any(|&para_id| para_id == config.para_id).then_some(*core_idx)
})
.collect::<Vec<_>>();

// Nothing to do if no core assigned to us.
if cores_to_build_on.is_empty() {
// Nothing to do if no core is assigned to us at any depth.
if assigned_cores.is_empty() {
return Ok(())
}

Expand Down Expand Up @@ -342,26 +344,81 @@ impl CollationGenerationSubsystem {
ctx.spawn(
"chained-collation-builder",
Box::pin(async move {
// Build the first collation in advance to find the claim queue offset and retrieve
// the list of cores to submit collations on.
let collator_fn = match task_config.collator.as_ref() {
Some(x) => x,
None => return,
};

let (mut collation, mut result_sender) =
match collator_fn(relay_parent, &validation_data).await {
Some(collation) => collation.into_inner(),
None => {
gum::debug!(
target: LOG_TARGET,
?para_id,
"collator returned no collation on collate",
);
return
},
};

// Use the core_selector method from CandidateCommitments to extract CoreSelector
// and ClaimQueueOffset.
let mut commitments = CandidateCommitments::default();
commitments.upward_messages = collation.upward_messages.clone();

let (mut core_selector, init_cq_offset) = match commitments.core_selector() {
Ok(Some((sel, off))) => (Some(sel), off),
Ok(None) => (None, ClaimQueueOffset(0)),
Err(err) => {
gum::debug!(
sandreim marked this conversation as resolved.
Show resolved Hide resolved
target: LOG_TARGET,
?para_id,
"error processing UMP signals: {}",
err
);
return
},
};

// Identify the cores to build collations on using the given claim queue offset.
let cores_to_build_on = claim_queue
.iter_claims_at_depth(init_cq_offset.0 as usize)
.filter_map(|(core_idx, para_id)| {
(para_id == task_config.para_id).then_some(core_idx)
})
.collect::<Vec<_>>();

// Track used core selector indexes not to submit collations on the same core.
let mut used_core_selector_indexes = vec![false; cores_to_build_on.len()];

let transposed_claim_queue = transpose_claim_queue(claim_queue.0);

for core_index in cores_to_build_on {
let collator_fn = match task_config.collator.as_ref() {
Some(x) => x,
None => return,
};
for core_index in &cores_to_build_on {
let descriptor_core_index = match core_selector {
Some(ref core_selector) => {
// Calculate the core selector index and ensure it hasn't been used.
let core_selector_index =
core_selector.0 as usize % cores_to_build_on.len();

let (collation, result_sender) =
match collator_fn(relay_parent, &validation_data).await {
Some(collation) => collation.into_inner(),
None => {
if used_core_selector_indexes[core_selector_index] {
gum::debug!(
target: LOG_TARGET,
?para_id,
"collator returned no collation on collate",
"parachain repeatedly selected the same core index",
);
return
},
};
}
used_core_selector_indexes[core_selector_index] = true;

let commitments_core_index = cores_to_build_on[core_selector_index];
commitments_core_index
},
// Fallback to the sequential core_index if no valid CoreSelector is found.
None => *core_index,
};

let parent_head = collation.head_data.clone();
if let Err(err) = construct_and_distribute_receipt(
Expand All @@ -372,7 +429,7 @@ impl CollationGenerationSubsystem {
validation_data: validation_data.clone(),
validation_code_hash,
n_validators,
core_index,
core_index: descriptor_core_index,
session_index,
},
task_config.key.clone(),
Expand All @@ -392,9 +449,80 @@ impl CollationGenerationSubsystem {
return
}

// No need to build more collations if we have already built and submitted
// cores_to_build_on.len() collations.
if core_index == cores_to_build_on.last().unwrap() {
return
}

// Chain the collations. All else stays the same as we build the chained
// collation on same relay parent.
validation_data.parent_head = parent_head;

sw10pa marked this conversation as resolved.
Show resolved Hide resolved
// Prepare the next collation.
(collation, result_sender) =
match collator_fn(relay_parent, &validation_data).await {
Some(collation) => collation.into_inner(),
None => {
gum::debug!(
target: LOG_TARGET,
?para_id,
"collator returned no collation on collate",
);
return
},
};

// Determine the new core selector and claim queue offset values.
commitments.upward_messages = collation.upward_messages.clone();
let (new_core_selector, new_cq_offset) = match commitments.core_selector() {
Ok(Some((sel, off))) => (Some(sel), off),
Ok(None) => (None, ClaimQueueOffset(0)),
Err(err) => {
gum::debug!(
target: LOG_TARGET,
?para_id,
"error processing UMP signals: {}",
err
);
return
},
};

// We generally assume that the claim queue offset will not change often,
// but if it does at any point, it is fine to skip that relay chain block
// and readjust the cores later.
if new_cq_offset.0 != init_cq_offset.0 {
gum::debug!(
target: LOG_TARGET,
?para_id,
"claim queue offset changed between blocks (initial: {}, current: {})",
init_cq_offset.0,
new_cq_offset.0
);
return
}

// We either use core selectors to choose cores for building blocks or iterate
// over all cores sequentially if no core selector is provided. A mixed approach
// is not acceptable.
if core_selector.is_none() && new_core_selector.is_some() {
gum::debug!(
target: LOG_TARGET,
?para_id,
"previous block(s) had no core selector, but the current block has one",
);
return
} else if core_selector.is_some() && new_core_selector.is_none() {
gum::debug!(
target: LOG_TARGET,
?para_id,
"previous block(s) had a core selector, but the current block does not",
);
return
} else {
core_selector = new_core_selector;
}
}
}),
)?;
Expand Down
Loading