Skip to content

Commit

Permalink
replay: populate block_id and send as part of TowerSync tx (#2776)
Browse files Browse the repository at this point in the history
  • Loading branch information
AshwinSekar authored Oct 18, 2024
1 parent 5535e79 commit 2d9764a
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 6 deletions.
17 changes: 14 additions & 3 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,11 +590,20 @@ impl Tower {
pub fn record_bank_vote(&mut self, bank: &Bank) -> Option<Slot> {
// Returns the new root if one is made after applying a vote for the given bank to
// `self.vote_state`
let block_id = bank.block_id().unwrap_or_else(|| {
// This can only happen for our leader bank
assert!(
*bank.collector_id() == self.node_pubkey,
"block_id must not be None for a frozen non-leader bank"
);
Hash::default()
});
self.record_bank_vote_and_update_lockouts(
bank.slot(),
bank.hash(),
bank.feature_set
.is_active(&solana_feature_set::enable_tower_sync_ix::id()),
block_id,
)
}

Expand All @@ -604,6 +613,7 @@ impl Tower {
&mut self,
vote_hash: Hash,
enable_tower_sync_ix: bool,
block_id: Hash,
) {
let mut new_vote = if enable_tower_sync_ix {
VoteTransaction::from(TowerSync::new(
Expand All @@ -614,7 +624,7 @@ impl Tower {
.collect(),
self.vote_state.root_slot,
vote_hash,
Hash::default(), // TODO: block_id will fill in upcoming pr
block_id,
))
} else {
VoteTransaction::from(VoteStateUpdate::new(
Expand All @@ -637,6 +647,7 @@ impl Tower {
vote_slot: Slot,
vote_hash: Hash,
enable_tower_sync_ix: bool,
block_id: Hash,
) -> Option<Slot> {
trace!("{} record_vote for {}", self.node_pubkey, vote_slot);
let old_root = self.root();
Expand All @@ -649,7 +660,7 @@ impl Tower {
vote_slot, vote_hash, result
);
}
self.update_last_vote_from_vote_state(vote_hash, enable_tower_sync_ix);
self.update_last_vote_from_vote_state(vote_hash, enable_tower_sync_ix, block_id);

let new_root = self.root();

Expand All @@ -667,7 +678,7 @@ impl Tower {

#[cfg(feature = "dev-context-only-utils")]
pub fn record_vote(&mut self, slot: Slot, hash: Hash) -> Option<Slot> {
self.record_bank_vote_and_update_lockouts(slot, hash, true)
self.record_bank_vote_and_update_lockouts(slot, hash, true, Hash::default())
}

/// Used for tests
Expand Down
23 changes: 21 additions & 2 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3107,7 +3107,7 @@ impl ReplayStage {
}
}

let _block_id = if bank.collector_id() != my_pubkey {
let block_id = if bank.collector_id() != my_pubkey {
// If the block does not have at least DATA_SHREDS_PER_FEC_BLOCK correctly retransmitted
// shreds in the last FEC set, mark it dead. No reason to perform this check on our leader block.
match blockstore.check_last_fec_set_and_get_block_id(
Expand Down Expand Up @@ -3139,6 +3139,7 @@ impl ReplayStage {
} else {
None
};
bank.set_block_id(block_id);

let r_replay_stats = replay_stats.read().unwrap();
let replay_progress = bank_progress.replay_progress.clone();
Expand Down Expand Up @@ -3433,6 +3434,7 @@ impl ReplayStage {
tower,
progress,
bank,
bank_forks,
);
let computed_bank_state = Tower::collect_vote_lockouts(
my_vote_pubkey,
Expand Down Expand Up @@ -3505,6 +3507,7 @@ impl ReplayStage {
tower: &mut Tower,
progress: &mut ProgressMap,
bank: &Arc<Bank>,
bank_forks: &RwLock<BankForks>,
) {
let Some(vote_account) = bank.get_vote_account(my_vote_pubkey) else {
return;
Expand Down Expand Up @@ -3585,13 +3588,27 @@ impl ReplayStage {
// Finally if both `bank` and `bank_vote_state.last_voted_slot()` are duplicate,
// we must have the compatible versions of both duplicates in order to replay `bank`
// successfully, so we are once again guaranteed that `bank_vote_state.last_voted_slot()`
// is present in progress map.
// is present in bank forks and progress map.
let block_id = {
// The block_id here will only be relevant if we need to refresh this last vote.
let bank = bank_forks
.read()
.unwrap()
.get(last_voted_slot)
.expect("Last voted slot that we are adopting must exist in bank forks");
// Here we don't have to check if this is our leader bank, as since we are adopting this bank,
// that means that it was created from a different instance (hot spare setup or a previous restart),
// and thus we must have replayed and set the block_id from the shreds.
bank.block_id()
.expect("block_id for an adopted bank cannot be None")
};
tower.update_last_vote_from_vote_state(
progress
.get_hash(last_voted_slot)
.expect("Must exist for us to have frozen descendant"),
bank.feature_set
.is_active(&solana_feature_set::enable_tower_sync_ix::id()),
block_id,
);
// Since we are updating our tower we need to update associated caches for previously computed
// slots as well.
Expand Down Expand Up @@ -4234,6 +4251,7 @@ pub(crate) mod tests {
slot: Slot,
) -> Arc<Bank> {
let bank = Bank::new_from_parent(parent, collector_id, slot);
bank.set_block_id(Some(Hash::new_unique()));
bank_forks
.write()
.unwrap()
Expand Down Expand Up @@ -8746,6 +8764,7 @@ pub(crate) mod tests {
&mut tower,
&mut progress,
bank_6,
&bank_forks,
);

// slot 3 should now pass the threshold check but be locked out.
Expand Down
2 changes: 2 additions & 0 deletions core/src/vote_simulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl VoteSimulator {
new_bank.register_unique_tick();
}
if !visit.node().has_no_child() || is_frozen {
new_bank.set_block_id(Some(Hash::new_unique()));
new_bank.freeze();
self.progress
.get_fork_stats_mut(new_bank.slot())
Expand Down Expand Up @@ -396,6 +397,7 @@ pub fn initialize_state(

genesis_config.poh_config.hashes_per_tick = Some(2);
let (bank0, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config);
bank0.set_block_id(Some(Hash::new_unique()));

for pubkey in validator_keypairs_map.keys() {
bank0.transfer(10_000, &mint_keypair, pubkey).unwrap();
Expand Down
13 changes: 12 additions & 1 deletion ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2138,6 +2138,7 @@ pub fn process_single_slot(
replay_vote_sender: Option<&ReplayVoteSender>,
timing: &mut ExecuteTimings,
) -> result::Result<(), BlockstoreProcessorError> {
let slot = bank.slot();
// Mark corrupt slots as dead so validators don't replay this slot and
// see AlreadyProcessed errors later in ReplayStage
confirm_full_slot(
Expand All @@ -2160,7 +2161,6 @@ pub fn process_single_slot(
Ok(())
})
.map_err(|err| {
let slot = bank.slot();
warn!("slot {} failed to verify: {}", slot, err);
if blockstore.is_primary_access() {
blockstore
Expand All @@ -2178,6 +2178,17 @@ pub fn process_single_slot(
if let Some((result, _timings)) = bank.wait_for_completed_scheduler() {
result?
}

let block_id = blockstore.check_last_fec_set_and_get_block_id(slot, bank.hash(), &bank.feature_set)
.inspect_err(|err| {
warn!("slot {} failed last fec set checks: {}", slot, err);
if blockstore.is_primary_access() {
blockstore.set_dead_slot(slot).expect("Failed to mark slot as dead in blockstore");
} else {
info!("Failed last fec set checks slot {slot} won't be marked dead due to being secondary blockstore access");
}
})?;
bank.set_block_id(block_id);
bank.freeze(); // all banks handled by this routine are created from complete slots

if let Some(slot_callback) = &opts.slot_callback {
Expand Down
18 changes: 18 additions & 0 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,7 @@ impl PartialEq for Bank {
transaction_account_lock_limit: _,
fee_structure: _,
cache_for_accounts_lt_hash: _,
block_id,
// Ignore new fields explicitly if they do not impact PartialEq.
// Adding ".." will remove compile-time checks that if a new field
// is added to the struct, this PartialEq is accordingly updated.
Expand Down Expand Up @@ -621,6 +622,7 @@ impl PartialEq for Bank {
*hash_overrides.lock().unwrap() == *other.hash_overrides.lock().unwrap())
&& !(self.is_accounts_lt_hash_enabled() && other.is_accounts_lt_hash_enabled()
&& *accounts_lt_hash.lock().unwrap() != *other.accounts_lt_hash.lock().unwrap())
&& *block_id.read().unwrap() == *other.block_id.read().unwrap()
}
}

Expand Down Expand Up @@ -925,6 +927,11 @@ pub struct Bank {
/// The accounts lt hash needs both the initial and final state of each
/// account that was modified in this slot. Cache the initial state here.
cache_for_accounts_lt_hash: RwLock<AHashMap<Pubkey, InitialStateOfAccount>>,

/// The unique identifier for the corresponding block for this bank.
/// None for banks that have not yet completed replay or for leader banks as we cannot populate block_id
/// until bankless leader. Can be computed directly from shreds without needing to execute transactions.
block_id: RwLock<Option<Hash>>,
}

struct VoteWithStakeDelegations {
Expand Down Expand Up @@ -1047,6 +1054,7 @@ impl Bank {
hash_overrides: Arc::new(Mutex::new(HashOverrides::default())),
accounts_lt_hash: Mutex::new(AccountsLtHash(LtHash([0xBAD1; LtHash::NUM_ELEMENTS]))),
cache_for_accounts_lt_hash: RwLock::new(AHashMap::new()),
block_id: RwLock::new(None),
};

bank.transaction_processor =
Expand Down Expand Up @@ -1321,6 +1329,7 @@ impl Bank {
hash_overrides: parent.hash_overrides.clone(),
accounts_lt_hash: Mutex::new(parent.accounts_lt_hash.lock().unwrap().clone()),
cache_for_accounts_lt_hash: RwLock::new(AHashMap::new()),
block_id: RwLock::new(None),
};

let (_, ancestors_time_us) = measure_us!({
Expand Down Expand Up @@ -1701,6 +1710,7 @@ impl Bank {
hash_overrides: Arc::new(Mutex::new(HashOverrides::default())),
accounts_lt_hash: Mutex::new(AccountsLtHash(LtHash([0xBAD2; LtHash::NUM_ELEMENTS]))),
cache_for_accounts_lt_hash: RwLock::new(AHashMap::new()),
block_id: RwLock::new(None),
};

bank.transaction_processor =
Expand Down Expand Up @@ -6849,6 +6859,14 @@ impl Bank {
&self.fee_structure
}

pub fn block_id(&self) -> Option<Hash> {
*self.block_id.read().unwrap()
}

pub fn set_block_id(&self, block_id: Option<Hash>) {
*self.block_id.write().unwrap() = block_id;
}

pub fn compute_budget(&self) -> Option<ComputeBudget> {
self.compute_budget
}
Expand Down

0 comments on commit 2d9764a

Please sign in to comment.