Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-4575: Add ABFT performance scorer #1884

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions finality-aleph/src/abft/current/performance/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{data_io::AlephData, Hasher};

mod scorer;
mod service;

pub use service::Service;
Expand Down
153 changes: 153 additions & 0 deletions finality-aleph/src/abft/current/performance/scorer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use current_aleph_bft::{NodeCount, NodeMap, Round};

use crate::{abft::current::performance::Batch, aleph_primitives::RawScore, UnverifiedHeader};

/// Scoring ABFT performance based on returned ordered unit batches.
pub struct Scorer {
newest_unit_by: NodeMap<Round>,
}

impl Scorer {
/// Create a new scorer for the provided node count.
pub fn new(node_count: NodeCount) -> Self {
Scorer {
newest_unit_by: NodeMap::with_size(node_count),
}
}

/// Add a batch of ordered units and return a score consisting of numbers of rounds a specific
/// node is behind.
pub fn process_batch<UH: UnverifiedHeader>(&mut self, batch: Batch<UH>) -> RawScore {
let max_round = batch.last().expect("batches always contain a head").round;
for unit in batch {
// Units are always added in order, so any unit created by an honest node
// here has a round greater than any that was included earlier.
// This is not necessarily true for forkers, but punishing them is fine.
self.newest_unit_by.insert(unit.creator, unit.round)
}
self.newest_unit_by
.size()
.into_iterator()
Marcin-Radecki marked this conversation as resolved.
Show resolved Hide resolved
.map(|node_id| {
self.newest_unit_by
.get(node_id)
// All other units have lower round than head, so the saturating_sub is just
// subtraction.
.map(|unit_round| max_round.saturating_sub(*unit_round))
// If we don't have a unit it's the same as having a unit of round equal to -1.
.unwrap_or(max_round + 1)
Marcin-Radecki marked this conversation as resolved.
Show resolved Hide resolved
})
.collect()
}
}

#[cfg(test)]
mod tests {
use std::iter;

use current_aleph_bft::{NodeCount, OrderedUnit, Round};

use super::Scorer;
use crate::{block::mock::MockHeader, data_io::AlephData, Hasher};

const NODE_COUNT: NodeCount = NodeCount(7);

fn units_up_to(max_round: Round) -> Vec<Vec<OrderedUnit<AlephData<MockHeader>, Hasher>>> {
let mut result = Vec::new();
for round in 0..=max_round {
let mut round_units = Vec::new();
for creator in NODE_COUNT.into_iterator() {
round_units.push(OrderedUnit {
data: None,
// We ignore the parents, so just putting nothing here.
parents: Vec::new(),
hash: Hasher::random_hash(),
creator,
round,
});
}
result.push(round_units);
}
result
}

#[test]
fn processes_initial_batch() {
let mut scorer = Scorer::new(NODE_COUNT);
let unit = units_up_to(0)
.pop()
.expect("there is a round")
.pop()
.expect("there is a unit");
assert_eq!(scorer.process_batch(vec![unit]), vec![1, 1, 1, 1, 1, 1, 0]);
}

#[test]
fn processes_perfect_performance_batch() {
let mut scorer = Scorer::new(NODE_COUNT);
let mut all_units = units_up_to(1);
let mut round_one_units = all_units.pop().expect("just created");
let mut round_zero_units = all_units.pop().expect("just created");
let first_head = round_zero_units.pop().expect("there is a unit");
assert_eq!(
scorer.process_batch(vec![first_head]),
vec![1, 1, 1, 1, 1, 1, 0]
);
let second_head = round_one_units.pop().expect("there is a unit");
round_zero_units.push(second_head);
assert_eq!(
scorer.process_batch(round_zero_units),
vec![1, 1, 1, 1, 1, 1, 0]
);
}

#[test]
fn processes_lacking_creator_batch() {
let mut scorer = Scorer::new(NODE_COUNT);
let mut all_units = units_up_to(1);
let mut round_one_units = all_units.pop().expect("just created");
round_one_units.pop();
let mut round_zero_units = all_units.pop().expect("just created");
round_zero_units.pop();
let first_head = round_zero_units.pop().expect("there is a unit");
assert_eq!(
scorer.process_batch(vec![first_head]),
vec![1, 1, 1, 1, 1, 0, 1]
);
let second_head = round_one_units.pop().expect("there is a unit");
round_zero_units.push(second_head);
assert_eq!(
scorer.process_batch(round_zero_units),
vec![1, 1, 1, 1, 1, 0, 2]
);
}

#[test]
fn processes_lagging_creator_batch() {
let mut scorer = Scorer::new(NODE_COUNT);
let mut all_units = units_up_to(2);
let mut round_two_units = all_units.pop().expect("just created");
round_two_units.pop();
let mut round_one_units = all_units.pop().expect("just created");
round_one_units.pop();
let mut round_zero_units = all_units.pop().expect("just created");
let lagged_unit = round_zero_units.pop().expect("just created");
let first_head = round_zero_units.pop().expect("there is a unit");
assert_eq!(
scorer.process_batch(vec![first_head]),
vec![1, 1, 1, 1, 1, 0, 1]
);
let second_head = round_one_units.pop().expect("there is a unit");
round_zero_units.push(second_head);
assert_eq!(
scorer.process_batch(round_zero_units),
vec![1, 1, 1, 1, 1, 0, 2]
);
let third_head = round_two_units.pop().expect("there is a unit");
let third_batch = iter::once(lagged_unit)
.chain(round_one_units)
.chain(iter::once(third_head))
.collect();
assert_eq!(scorer.process_batch(third_batch), vec![1, 1, 1, 1, 1, 0, 2]);
}
}
27 changes: 22 additions & 5 deletions finality-aleph/src/abft/current/performance/service.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use current_aleph_bft::NodeCount;
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use log::{debug, warn};
use log::{debug, error, warn};

use crate::{
abft::{current::performance::Batch, LOG_TARGET},
abft::{
current::performance::{scorer::Scorer, Batch},
LOG_TARGET,
},
data_io::AlephData,
party::manager::Runnable,
Hasher, UnverifiedHeader,
Expand Down Expand Up @@ -59,6 +63,7 @@ where
UH: UnverifiedHeader,
{
batches_from_abft: mpsc::UnboundedReceiver<Batch<UH>>,
scorer: Scorer,
}

impl<UH> Service<UH>
Expand All @@ -68,6 +73,7 @@ where
/// Create a new service, together with a unit finalization handler that should be passed to
/// ABFT. It will wrap the provided finalization handler and call it in the background.
pub fn new<FH>(
n_members: usize,
finalization_handler: FH,
) -> (
Self,
Expand All @@ -78,7 +84,10 @@ where
{
let (batches_for_us, batches_from_abft) = mpsc::unbounded();
(
Service { batches_from_abft },
Service {
batches_from_abft,
scorer: Scorer::new(NodeCount(n_members)),
},
FinalizationWrapper::new(finalization_handler, batches_for_us),
)
}
Expand All @@ -92,8 +101,16 @@ where
async fn run(mut self, mut exit: oneshot::Receiver<()>) {
loop {
tokio::select! {
_maybe_batch = self.batches_from_abft.next() => {
// TODO(A0-4575): actually compute the score form batches etc
maybe_batch = self.batches_from_abft.next() => {
let score = match maybe_batch {
Some(batch) => self.scorer.process_batch(batch),
None => {
error!(target: LOG_TARGET, "Batches' channel closed, ABFT performance scoring terminating.");
break;
},
};
debug!(target: LOG_TARGET, "Received ABFT score: {:?}.", score);
// TODO(A0-4339): sometimes submit these scores to the chain.
}
_ = &mut exit => {
debug!(target: LOG_TARGET, "ABFT performance scoring task received exit signal. Terminating.");
Expand Down
7 changes: 7 additions & 0 deletions finality-aleph/src/abft/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ impl<H: SpHash> Wrapper<H> {
inner: <H as SpHash>::hash(s),
}
}

#[cfg(test)]
pub fn random_hash() -> OrdForHash<H::Output> {
use rand::distributions::{Alphanumeric, DistString};
let string = Alphanumeric.sample_string(&mut rand::thread_rng(), 137);
Self::hash(string.as_ref())
}
}

impl<H: SpHash> current_aleph_bft::Hasher for Wrapper<H> {
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/party/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ where
session_boundaries.clone(),
);
let (abft_performance, abft_batch_handler) =
CurrentPerformanceService::new(ordered_data_interpreter);
CurrentPerformanceService::new(n_members, ordered_data_interpreter);
let consensus_config =
current_create_aleph_config(n_members, node_id, session_id, self.unit_creation_delay);
let data_network = data_network.map();
Expand Down
4 changes: 3 additions & 1 deletion primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,13 @@ pub mod staking {

pub type ScoreNonce = u32;

pub type RawScore = Vec<u16>;

#[derive(PartialEq, Decode, Encode, TypeInfo, Debug, Clone)]
pub struct Score {
pub session_id: SessionIndex,
pub nonce: ScoreNonce,
pub points: Vec<u32>,
pub points: RawScore,
}

pub mod crypto {
Expand Down
Loading