Skip to content

Commit

Permalink
Only aggregate heaviest fork at the coordinator. (#3115)
Browse files Browse the repository at this point in the history
* Only aggregate heaviest fork for the coordinator, do not exit until asked.

* Fix a bad merge.
  • Loading branch information
wen-coding authored Oct 17, 2024
1 parent 114ab4d commit 2bde0fc
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 455 deletions.
11 changes: 3 additions & 8 deletions wen-restart/proto/wen_restart.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,12 @@ message HeaviestForkRecord {
uint64 total_active_stake = 3;
uint32 shred_version = 4;
uint64 wallclock = 5;
}

message HeaviestForkAggregateFinal {
uint64 total_active_stake = 1;
uint64 total_active_stake_seen_supermajority = 2;
uint64 total_active_stake_agreed_with_me = 3;
string from = 6;
}

message HeaviestForkAggregateRecord {
map<string, HeaviestForkRecord> received = 1;
optional HeaviestForkAggregateFinal final_result = 2;
repeated HeaviestForkRecord received = 1;
uint64 total_active_stake = 2;
}

message GenerateSnapshotRecord {
Expand Down
129 changes: 39 additions & 90 deletions wen-restart/src/heaviest_fork_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use {
};

pub(crate) struct HeaviestForkAggregate {
supermajority_threshold: f64,
my_shred_version: u16,
my_pubkey: Pubkey,
// We use the epoch_stakes of the Epoch our heaviest bank is in. Proceed and exit only if
Expand All @@ -21,14 +20,6 @@ pub(crate) struct HeaviestForkAggregate {
heaviest_forks: HashMap<Pubkey, RestartHeaviestFork>,
block_stake_map: HashMap<(Slot, Hash), u64>,
active_peers: HashSet<Pubkey>,
active_peers_seen_supermajority: HashSet<Pubkey>,
}

#[derive(Clone, Debug, PartialEq)]
pub struct HeaviestForkFinalResult {
pub block_stake_map: HashMap<(Slot, Hash), u64>,
pub total_active_stake: u64,
pub total_active_stake_seen_supermajority: u64,
}

#[derive(Debug, PartialEq)]
Expand All @@ -42,7 +33,6 @@ pub enum HeaviestForkAggregateResult {

impl HeaviestForkAggregate {
pub(crate) fn new(
wait_for_supermajority_threshold_percent: u64,
my_shred_version: u16,
epoch_stakes: &EpochStakes,
my_heaviest_fork_slot: Slot,
Expand All @@ -57,23 +47,20 @@ impl HeaviestForkAggregate {
epoch_stakes.node_id_to_stake(my_pubkey).unwrap_or(0),
);
Self {
supermajority_threshold: wait_for_supermajority_threshold_percent as f64 / 100.0,
my_shred_version,
my_pubkey: *my_pubkey,
epoch_stakes: epoch_stakes.clone(),
heaviest_forks: HashMap::new(),
block_stake_map,
active_peers,
active_peers_seen_supermajority: HashSet::new(),
}
}

pub(crate) fn aggregate_from_record(
&mut self,
key_string: &str,
record: &HeaviestForkRecord,
) -> Result<HeaviestForkAggregateResult> {
let from = Pubkey::from_str(key_string)?;
let from = Pubkey::from_str(&record.from)?;
let bankhash = Hash::from_str(&record.bankhash)?;
let restart_heaviest_fork = RestartHeaviestFork {
from,
Expand All @@ -100,7 +87,6 @@ impl HeaviestForkAggregate {
}
if current_heaviest_fork == new_heaviest_fork
|| current_heaviest_fork.wallclock > new_heaviest_fork.wallclock
|| current_heaviest_fork.observed_stake == new_heaviest_fork.observed_stake
{
return HeaviestForkAggregateResult::AlreadyExists;
}
Expand All @@ -110,14 +96,14 @@ impl HeaviestForkAggregate {
total_active_stake: new_heaviest_fork.observed_stake,
shred_version: new_heaviest_fork.shred_version as u32,
wallclock: new_heaviest_fork.wallclock,
from: new_heaviest_fork.from.to_string(),
})
}

pub(crate) fn aggregate(
&mut self,
received_heaviest_fork: RestartHeaviestFork,
) -> HeaviestForkAggregateResult {
let total_stake = self.epoch_stakes.total_stake();
let from = &received_heaviest_fork.from;
let sender_stake = self.epoch_stakes.node_id_to_stake(from).unwrap_or(0);
if sender_stake == 0 {
Expand Down Expand Up @@ -161,22 +147,11 @@ impl HeaviestForkAggregate {
total_active_stake: received_heaviest_fork.observed_stake,
shred_version: received_heaviest_fork.shred_version as u32,
wallclock: received_heaviest_fork.wallclock,
from: from.to_string(),
})
};
self.heaviest_forks
.insert(*from, received_heaviest_fork.clone());
if received_heaviest_fork.observed_stake as f64 / total_stake as f64
>= self.supermajority_threshold
{
self.active_peers_seen_supermajority.insert(*from);
}
if !self
.active_peers_seen_supermajority
.contains(&self.my_pubkey)
&& self.total_active_stake() as f64 / total_stake as f64 >= self.supermajority_threshold
{
self.active_peers_seen_supermajority.insert(self.my_pubkey);
}
result
}

Expand All @@ -186,14 +161,6 @@ impl HeaviestForkAggregate {
})
}

pub(crate) fn total_active_stake_seen_supermajority(&self) -> u64 {
self.active_peers_seen_supermajority
.iter()
.fold(0, |sum: u64, pubkey| {
sum.saturating_add(self.epoch_stakes.node_id_to_stake(pubkey).unwrap_or(0))
})
}

pub(crate) fn block_stake_map(self) -> HashMap<(Slot, Hash), u64> {
self.block_stake_map
}
Expand Down Expand Up @@ -244,7 +211,6 @@ mod tests {
let heaviest_hash = Hash::new_unique();
TestAggregateInitResult {
heaviest_fork_aggregate: HeaviestForkAggregate::new(
75,
SHRED_VERSION,
root_bank.epoch_stakes(root_bank.epoch()).unwrap(),
heaviest_slot,
Expand Down Expand Up @@ -285,6 +251,7 @@ mod tests {
total_active_stake: 100,
shred_version: SHRED_VERSION as u32,
wallclock: timestamp1,
from: pubkey.to_string(),
}),
);
}
Expand Down Expand Up @@ -316,6 +283,7 @@ mod tests {
total_active_stake: 100,
shred_version: SHRED_VERSION as u32,
wallclock: now,
from: new_active_validator.to_string(),
}),
);
let expected_total_active_stake = (initial_num_active_validators + 2) as u64 * 100;
Expand Down Expand Up @@ -399,19 +367,14 @@ mod tests {
total_active_stake: 1400,
shred_version: SHRED_VERSION as u32,
wallclock: now,
from: pubkey.to_string(),
}),
);
}
assert_eq!(
test_state.heaviest_fork_aggregate.total_active_stake(),
1400
);
assert_eq!(
test_state
.heaviest_fork_aggregate
.total_active_stake_seen_supermajority(),
0
);

// test that when 75% of the stake is seeing supermajority,
// the active percent seeing supermajority is 75%.
Expand All @@ -435,6 +398,7 @@ mod tests {
total_active_stake: 1500,
shred_version: SHRED_VERSION as u32,
wallclock: now,
from: pubkey.to_string(),
}),
);
}
Expand All @@ -443,14 +407,6 @@ mod tests {
test_state.heaviest_fork_aggregate.total_active_stake(),
1500
);
// I myself is seeing supermajority as well, with the 14 validators
// reporting 70%, the total active stake seeing supermajority is 1500 (75%).
assert_eq!(
test_state
.heaviest_fork_aggregate
.total_active_stake_seen_supermajority(),
1500
);

// test that message from my pubkey is ignored.
assert_eq!(
Expand Down Expand Up @@ -483,12 +439,13 @@ mod tests {
bankhash: test_state.heaviest_hash.to_string(),
shred_version: SHRED_VERSION as u32,
total_active_stake: 100,
from: from.to_string(),
};
assert_eq!(test_state.heaviest_fork_aggregate.total_active_stake(), 100);
assert_eq!(
test_state
.heaviest_fork_aggregate
.aggregate_from_record(&from.to_string(), &record,)
.aggregate_from_record(&record)
.unwrap(),
HeaviestForkAggregateResult::Inserted(record.clone()),
);
Expand Down Expand Up @@ -528,6 +485,7 @@ mod tests {
bankhash: test_state.heaviest_hash.to_string(),
shred_version: SHRED_VERSION as u32,
total_active_stake: 200,
from: from.to_string(),
}),
);

Expand Down Expand Up @@ -576,42 +534,39 @@ mod tests {
assert_eq!(test_state.heaviest_fork_aggregate.total_active_stake(), 200);

// Record from validator with zero stake should be ignored.
let zero_stake_validator = Pubkey::new_unique();
assert_eq!(
test_state
.heaviest_fork_aggregate
.aggregate_from_record(
&Pubkey::new_unique().to_string(),
&HeaviestForkRecord {
wallclock: timestamp(),
slot: test_state.heaviest_slot,
bankhash: test_state.heaviest_hash.to_string(),
shred_version: SHRED_VERSION as u32,
total_active_stake: 100,
}
)
.aggregate_from_record(&HeaviestForkRecord {
wallclock: timestamp(),
slot: test_state.heaviest_slot,
bankhash: test_state.heaviest_hash.to_string(),
shred_version: SHRED_VERSION as u32,
total_active_stake: 100,
from: zero_stake_validator.to_string(),
})
.unwrap(),
HeaviestForkAggregateResult::ZeroStakeIgnored,
);
// percentage doesn't change since the previous aggregate is ignored.
assert_eq!(test_state.heaviest_fork_aggregate.total_active_stake(), 200);

// Record from my pubkey should be ignored.
let my_pubkey = test_state.validator_voting_keypairs[MY_INDEX]
.node_keypair
.pubkey();
assert_eq!(
test_state
.heaviest_fork_aggregate
.aggregate_from_record(
&test_state.validator_voting_keypairs[MY_INDEX]
.node_keypair
.pubkey()
.to_string(),
&HeaviestForkRecord {
wallclock: timestamp(),
slot: test_state.heaviest_slot,
bankhash: test_state.heaviest_hash.to_string(),
shred_version: SHRED_VERSION as u32,
total_active_stake: 100,
}
)
.aggregate_from_record(&HeaviestForkRecord {
wallclock: timestamp(),
slot: test_state.heaviest_slot,
bankhash: test_state.heaviest_hash.to_string(),
shred_version: SHRED_VERSION as u32,
total_active_stake: 100,
from: my_pubkey.to_string(),
})
.unwrap(),
HeaviestForkAggregateResult::AlreadyExists,
);
Expand All @@ -620,46 +575,40 @@ mod tests {
#[test]
fn test_aggregate_from_record_failures() {
let mut test_state = test_aggregate_init();
let from = test_state.validator_voting_keypairs[0]
.node_keypair
.pubkey();
let mut heaviest_fork_record = HeaviestForkRecord {
wallclock: timestamp(),
slot: test_state.heaviest_slot,
bankhash: test_state.heaviest_hash.to_string(),
shred_version: SHRED_VERSION as u32,
total_active_stake: 100,
from: from.to_string(),
};
// First test that this is a valid record.
assert_eq!(
test_state
.heaviest_fork_aggregate
.aggregate_from_record(
&test_state.validator_voting_keypairs[0]
.node_keypair
.pubkey()
.to_string(),
&heaviest_fork_record,
)
.aggregate_from_record(&heaviest_fork_record,)
.unwrap(),
HeaviestForkAggregateResult::Inserted(heaviest_fork_record.clone()),
);
// Then test that it fails if the record is invalid.

// Invalid pubkey.
heaviest_fork_record.from = "invalid_pubkey".to_string();
assert!(test_state
.heaviest_fork_aggregate
.aggregate_from_record("invalid_pubkey", &heaviest_fork_record,)
.aggregate_from_record(&heaviest_fork_record,)
.is_err());

// Invalid hash.
heaviest_fork_record.from = from.to_string();
heaviest_fork_record.bankhash.clear();
assert!(test_state
.heaviest_fork_aggregate
.aggregate_from_record(
&test_state.validator_voting_keypairs[0]
.node_keypair
.pubkey()
.to_string(),
&heaviest_fork_record,
)
.aggregate_from_record(&heaviest_fork_record,)
.is_err());
}
}
Loading

0 comments on commit 2bde0fc

Please sign in to comment.