Skip to content

Commit

Permalink
A0-4575: Add ABFT performance scorer (#1884)
Browse files Browse the repository at this point in the history
# Description

Adds a scorer that computes how many rounds which ABFT member was
behind.

## Type of change

- New feature (non-breaking change which adds functionality)

# Checklist:

- I have added tests
- I have created new documentation
  • Loading branch information
timorleph authored Dec 10, 2024
1 parent b0d696f commit f113c1a
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 110 deletions.
195 changes: 95 additions & 100 deletions aleph-client/src/aleph_zero.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions finality-aleph/src/abft/current/performance/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{data_io::AlephData, Hasher};

mod scorer;
mod service;

pub use service::Service;
Expand Down
152 changes: 152 additions & 0 deletions finality-aleph/src/abft/current/performance/scorer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use current_aleph_bft::{NodeCount, NodeMap, Round};

use crate::{abft::current::performance::Batch, aleph_primitives::RawScore, UnverifiedHeader};

/// Scoring ABFT performance based on returned ordered unit batches.
pub struct Scorer {
newest_unit_by: NodeMap<Round>,
}

impl Scorer {
/// Create a new scorer for the provided node count.
pub fn new(node_count: NodeCount) -> Self {
Scorer {
newest_unit_by: NodeMap::with_size(node_count),
}
}

/// Add a batch of ordered units and return a score consisting of numbers of rounds a specific
/// node is behind.
pub fn process_batch<UH: UnverifiedHeader>(&mut self, batch: Batch<UH>) -> RawScore {
let max_round = batch.last().expect("batches always contain a head").round;
for unit in batch {
// Units are always added in order, so any unit created by an honest node
// here has a round greater than any that was included earlier.
// This is not necessarily true for forkers, but punishing them is fine.
self.newest_unit_by.insert(unit.creator, unit.round)
}
let all_nodes = self.newest_unit_by.size().into_iterator();
all_nodes
.map(|node_id| {
self.newest_unit_by
.get(node_id)
// All other units have lower round than head, so the saturating_sub is just
// subtraction.
.map(|unit_round| max_round.saturating_sub(*unit_round))
// If we don't have a unit it's the same as having a unit of round equal to -1.
.unwrap_or(max_round.saturating_add(1))
})
.collect()
}
}

#[cfg(test)]
mod tests {
use std::iter;

use current_aleph_bft::{NodeCount, OrderedUnit, Round};

use super::Scorer;
use crate::{block::mock::MockHeader, data_io::AlephData, Hasher};

const NODE_COUNT: NodeCount = NodeCount(7);

fn units_up_to(max_round: Round) -> Vec<Vec<OrderedUnit<AlephData<MockHeader>, Hasher>>> {
let mut result = Vec::new();
for round in 0..=max_round {
let mut round_units = Vec::new();
for creator in NODE_COUNT.into_iterator() {
round_units.push(OrderedUnit {
data: None,
// We ignore the parents, so just putting nothing here.
parents: Vec::new(),
hash: Hasher::random_hash(),
creator,
round,
});
}
result.push(round_units);
}
result
}

#[test]
fn processes_initial_batch() {
let mut scorer = Scorer::new(NODE_COUNT);
let unit = units_up_to(0)
.pop()
.expect("there is a round")
.pop()
.expect("there is a unit");
assert_eq!(scorer.process_batch(vec![unit]), vec![1, 1, 1, 1, 1, 1, 0]);
}

#[test]
fn processes_perfect_performance_batch() {
let mut scorer = Scorer::new(NODE_COUNT);
let mut all_units = units_up_to(1);
let mut round_one_units = all_units.pop().expect("just created");
let mut round_zero_units = all_units.pop().expect("just created");
let first_head = round_zero_units.pop().expect("there is a unit");
assert_eq!(
scorer.process_batch(vec![first_head]),
vec![1, 1, 1, 1, 1, 1, 0]
);
let second_head = round_one_units.pop().expect("there is a unit");
round_zero_units.push(second_head);
assert_eq!(
scorer.process_batch(round_zero_units),
vec![1, 1, 1, 1, 1, 1, 0]
);
}

#[test]
fn processes_lacking_creator_batch() {
let mut scorer = Scorer::new(NODE_COUNT);
let mut all_units = units_up_to(1);
let mut round_one_units = all_units.pop().expect("just created");
round_one_units.pop();
let mut round_zero_units = all_units.pop().expect("just created");
round_zero_units.pop();
let first_head = round_zero_units.pop().expect("there is a unit");
assert_eq!(
scorer.process_batch(vec![first_head]),
vec![1, 1, 1, 1, 1, 0, 1]
);
let second_head = round_one_units.pop().expect("there is a unit");
round_zero_units.push(second_head);
assert_eq!(
scorer.process_batch(round_zero_units),
vec![1, 1, 1, 1, 1, 0, 2]
);
}

#[test]
fn processes_lagging_creator_batch() {
let mut scorer = Scorer::new(NODE_COUNT);
let mut all_units = units_up_to(2);
let mut round_two_units = all_units.pop().expect("just created");
round_two_units.pop();
let mut round_one_units = all_units.pop().expect("just created");
round_one_units.pop();
let mut round_zero_units = all_units.pop().expect("just created");
let lagged_unit = round_zero_units.pop().expect("just created");
let first_head = round_zero_units.pop().expect("there is a unit");
assert_eq!(
scorer.process_batch(vec![first_head]),
vec![1, 1, 1, 1, 1, 0, 1]
);
let second_head = round_one_units.pop().expect("there is a unit");
round_zero_units.push(second_head);
assert_eq!(
scorer.process_batch(round_zero_units),
vec![1, 1, 1, 1, 1, 0, 2]
);
let third_head = round_two_units.pop().expect("there is a unit");
let third_batch = iter::once(lagged_unit)
.chain(round_one_units)
.chain(iter::once(third_head))
.collect();
assert_eq!(scorer.process_batch(third_batch), vec![1, 1, 1, 1, 1, 0, 2]);
}
}
27 changes: 22 additions & 5 deletions finality-aleph/src/abft/current/performance/service.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use current_aleph_bft::NodeCount;
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use log::{debug, warn};
use log::{debug, error, warn};

use crate::{
abft::{current::performance::Batch, LOG_TARGET},
abft::{
current::performance::{scorer::Scorer, Batch},
LOG_TARGET,
},
data_io::AlephData,
party::manager::Runnable,
Hasher, UnverifiedHeader,
Expand Down Expand Up @@ -59,6 +63,7 @@ where
UH: UnverifiedHeader,
{
batches_from_abft: mpsc::UnboundedReceiver<Batch<UH>>,
scorer: Scorer,
}

impl<UH> Service<UH>
Expand All @@ -68,6 +73,7 @@ where
/// Create a new service, together with a unit finalization handler that should be passed to
/// ABFT. It will wrap the provided finalization handler and call it in the background.
pub fn new<FH>(
n_members: usize,
finalization_handler: FH,
) -> (
Self,
Expand All @@ -78,7 +84,10 @@ where
{
let (batches_for_us, batches_from_abft) = mpsc::unbounded();
(
Service { batches_from_abft },
Service {
batches_from_abft,
scorer: Scorer::new(NodeCount(n_members)),
},
FinalizationWrapper::new(finalization_handler, batches_for_us),
)
}
Expand All @@ -92,8 +101,16 @@ where
async fn run(mut self, mut exit: oneshot::Receiver<()>) {
loop {
tokio::select! {
_maybe_batch = self.batches_from_abft.next() => {
// TODO(A0-4575): actually compute the score form batches etc
maybe_batch = self.batches_from_abft.next() => {
let score = match maybe_batch {
Some(batch) => self.scorer.process_batch(batch),
None => {
error!(target: LOG_TARGET, "Batches' channel closed, ABFT performance scoring terminating.");
break;
},
};
debug!(target: LOG_TARGET, "Received ABFT score: {:?}.", score);
// TODO(A0-4339): sometimes submit these scores to the chain.
}
_ = &mut exit => {
debug!(target: LOG_TARGET, "ABFT performance scoring task received exit signal. Terminating.");
Expand Down
7 changes: 7 additions & 0 deletions finality-aleph/src/abft/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ impl<H: SpHash> Wrapper<H> {
inner: <H as SpHash>::hash(s),
}
}

#[cfg(test)]
pub fn random_hash() -> OrdForHash<H::Output> {
use rand::distributions::{Alphanumeric, DistString};
let string = Alphanumeric.sample_string(&mut rand::thread_rng(), 137);
Self::hash(string.as_ref())
}
}

impl<H: SpHash> current_aleph_bft::Hasher for Wrapper<H> {
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/party/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ where
session_boundaries.clone(),
);
let (abft_performance, abft_batch_handler) =
CurrentPerformanceService::new(ordered_data_interpreter);
CurrentPerformanceService::new(n_members, ordered_data_interpreter);
let consensus_config =
current_create_aleph_config(n_members, node_id, session_id, self.unit_creation_delay);
let data_network = data_network.map();
Expand Down
2 changes: 1 addition & 1 deletion pallets/committee-management/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ pub mod pallet {
#[pallet::weight((T::BlockWeights::get().max_block, DispatchClass::Operational))]
pub fn set_finality_ban_config(
origin: OriginFor<T>,
minimal_expected_performance: Option<u32>,
minimal_expected_performance: Option<u16>,
underperformed_session_count_threshold: Option<u32>,
ban_period: Option<EraIndex>,
) -> DispatchResult {
Expand Down
8 changes: 5 additions & 3 deletions primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,14 @@ pub trait AbftScoresProvider {
#[derive(Decode, Encode, TypeInfo, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct FinalityBanConfig {
/// Number representing how many rounds a parent of a head of an abft round is allowed to be behind the head.
pub minimal_expected_performance: u32,
pub minimal_expected_performance: u16,
/// How many bad sessions force validator to be removed from the committee
pub underperformed_session_count_threshold: SessionCount,
/// how many eras a validator is banned for
pub ban_period: EraIndex,
}

pub const DEFAULT_FINALITY_BAN_MINIMAL_EXPECTED_PERFORMANCE: u32 = 11;
pub const DEFAULT_FINALITY_BAN_MINIMAL_EXPECTED_PERFORMANCE: u16 = 11;
pub const DEFAULT_FINALITY_BAN_SESSION_COUNT_THRESHOLD: SessionCount = 2;

impl Default for FinalityBanConfig {
Expand Down Expand Up @@ -448,11 +448,13 @@ pub mod staking {

pub type ScoreNonce = u32;

pub type RawScore = Vec<u16>;

#[derive(PartialEq, Decode, Encode, TypeInfo, Debug, Clone)]
pub struct Score {
pub session_id: SessionIndex,
pub nonce: ScoreNonce,
pub points: Vec<u32>,
pub points: RawScore,
}

pub mod crypto {
Expand Down

0 comments on commit f113c1a

Please sign in to comment.