Skip to content

Commit

Permalink
A0-4346: Gather signatures and actually submit scores to chain (#1907)
Browse files Browse the repository at this point in the history
# Description

Send performance scores to the aggregator for gathering multisignatures
under them and then submits them on chain. This should finish the ABFT
performance project on the node side, modulo testing.

## Type of change

- New feature (non-breaking change which adds functionality)

# Checklist:

- I have created new documentation
  • Loading branch information
timorleph authored Jan 15, 2025
1 parent 61c42fc commit 75ef572
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 58 deletions.
2 changes: 1 addition & 1 deletion finality-aleph/src/abft/current/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod performance;
mod traits;

pub use network::NetworkData;
pub use performance::Service as PerformanceService;
pub use performance::{Service as PerformanceService, ServiceIO as PerformanceServiceIO};

pub use crate::aleph_primitives::CURRENT_FINALITY_VERSION as VERSION;
use crate::{
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/abft/current/performance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ use crate::{data_io::AlephData, Hasher};
mod scorer;
mod service;

pub use service::Service;
pub use service::{Service, ServiceIO};

type Batch<UH> = Vec<current_aleph_bft::OrderedUnit<AlephData<UH>, Hasher>>;
94 changes: 86 additions & 8 deletions finality-aleph/src/abft/current/performance/service.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
use std::collections::HashMap;

use current_aleph_bft::NodeCount;
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use log::{debug, error, warn};
use parity_scale_codec::Encode;
use sp_runtime::traits::Hash as _;

use crate::{
abft::{
current::performance::{scorer::Scorer, Batch},
LOG_TARGET,
},
aleph_primitives::{
crypto::SignatureSet, AuthoritySignature, Hash, Hashing, RawScore, Score, ScoreNonce,
},
data_io::AlephData,
metrics::ScoreMetrics,
party::manager::Runnable,
Hasher, UnverifiedHeader,
runtime_api::RuntimeApi,
Hasher, SessionId, UnverifiedHeader,
};

const SCORE_SUBMISSION_PERIOD: usize = 300;

struct FinalizationWrapper<UH, FH>
where
UH: UnverifiedHeader,
Expand Down Expand Up @@ -59,26 +69,43 @@ where
}

/// A service computing the performance score of ABFT nodes based on batches of ordered units.
pub struct Service<UH>
pub struct Service<UH, RA>
where
UH: UnverifiedHeader,
RA: RuntimeApi,
{
my_index: usize,
session_id: SessionId,
batches_from_abft: mpsc::UnboundedReceiver<Batch<UH>>,
hashes_for_aggregator: mpsc::UnboundedSender<Hash>,
signatures_from_aggregator: mpsc::UnboundedReceiver<(Hash, SignatureSet<AuthoritySignature>)>,
runtime_api: RA,
pending_scores: HashMap<Hash, Score>,
nonce: ScoreNonce,
scorer: Scorer,
metrics: ScoreMetrics,
}

impl<UH> Service<UH>
pub struct ServiceIO {
pub hashes_for_aggregator: mpsc::UnboundedSender<Hash>,
pub signatures_from_aggregator:
mpsc::UnboundedReceiver<(Hash, SignatureSet<AuthoritySignature>)>,
}

impl<UH, RA> Service<UH, RA>
where
UH: UnverifiedHeader,
RA: RuntimeApi,
{
/// Create a new service, together with a unit finalization handler that should be passed to
/// ABFT. It will wrap the provided finalization handler and call it in the background.
pub fn new<FH>(
my_index: usize,
n_members: usize,
session_id: SessionId,
finalization_handler: FH,
io: ServiceIO,
runtime_api: RA,
metrics: ScoreMetrics,
) -> (
Self,
Expand All @@ -87,38 +114,89 @@ where
where
FH: current_aleph_bft::FinalizationHandler<AlephData<UH>>,
{
let ServiceIO {
hashes_for_aggregator,
signatures_from_aggregator,
} = io;
let (batches_for_us, batches_from_abft) = mpsc::unbounded();
(
Service {
my_index,
session_id,
batches_from_abft,
hashes_for_aggregator,
signatures_from_aggregator,
runtime_api,
pending_scores: HashMap::new(),
nonce: 1,
scorer: Scorer::new(NodeCount(n_members)),
metrics,
},
FinalizationWrapper::new(finalization_handler, batches_for_us),
)
}

fn make_score(&mut self, points: RawScore) -> Score {
let result = Score {
session_id: self.session_id.0,
nonce: self.nonce,
points,
};
self.nonce += 1;
result
}
}

#[async_trait::async_trait]
impl<UH> Runnable for Service<UH>
impl<UH, RA> Runnable for Service<UH, RA>
where
UH: UnverifiedHeader,
RA: RuntimeApi,
{
async fn run(mut self, mut exit: oneshot::Receiver<()>) {
let mut batch_counter = 1;
loop {
tokio::select! {
maybe_batch = self.batches_from_abft.next() => {
let score = match maybe_batch {
let points = match maybe_batch {
Some(batch) => self.scorer.process_batch(batch),
None => {
error!(target: LOG_TARGET, "Batches' channel closed, ABFT performance scoring terminating.");
break;
},
};
debug!(target: LOG_TARGET, "Received ABFT score: {:?}.", score);
self.metrics.report_score(score[self.my_index]);
// TODO(A0-4339): sometimes submit these scores to the chain.
self.metrics.report_score(points[self.my_index]);
if batch_counter % SCORE_SUBMISSION_PERIOD == 0 {
let score = self.make_score(points);
let score_hash = Hashing::hash_of(&score.encode());
debug!(target: LOG_TARGET, "Gathering signature under ABFT score: {:?}.", score);
self.pending_scores.insert(score_hash, score);
if let Err(e) = self.hashes_for_aggregator.unbounded_send(score_hash) {
error!(target: LOG_TARGET, "Failed to send score hash to signature aggregation: {}.", e);
break;
}
}
batch_counter += 1;
}
maybe_signed = self.signatures_from_aggregator.next() => {
match maybe_signed {
Some((hash, signature)) => {
match self.pending_scores.remove(&hash) {
Some(score) => {
if let Err(e) = self.runtime_api.submit_abft_score(score, signature) {
warn!(target: LOG_TARGET, "Failed to submit performance score to chain: {}.", e);
}
},
None => {
warn!(target: LOG_TARGET, "Received multisigned hash for unknown performance score, this shouldn't ever happen.");
},
}
},
None => {
error!(target: LOG_TARGET, "Signatures' channel closed, ABFT performance scoring terminating.");
break;
},
}
}
_ = &mut exit => {
debug!(target: LOG_TARGET, "ABFT performance scoring task received exit signal. Terminating.");
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/abft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use crypto::Keychain;
pub use current::{
create_aleph_config as current_create_aleph_config, run_member as run_current_member,
NetworkData as CurrentNetworkData, PerformanceService as CurrentPerformanceService,
VERSION as CURRENT_VERSION,
PerformanceServiceIO as CurrentPerformanceServiceIO, VERSION as CURRENT_VERSION,
};
pub use legacy::{
create_aleph_config as legacy_create_aleph_config, run_member as run_legacy_member,
Expand Down
15 changes: 10 additions & 5 deletions finality-aleph/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use primitives::TransactionHash;
use rate_limiter::SharedRateLimiter;
use sc_client_api::Backend;
use sc_keystore::{Keystore, LocalKeystore};
use sc_transaction_pool_api::TransactionPool;
use sc_transaction_pool_api::{LocalTransactionPool, TransactionPool};
use sp_consensus_aura::AuraApi;

use crate::{
Expand Down Expand Up @@ -60,7 +60,9 @@ where
C: crate::ClientForAleph<Block, BE> + Send + Sync + 'static,
C::Api: AlephSessionApi<Block> + AuraApi<Block, AuraId>,
BE: Backend<Block> + 'static,
TP: TransactionPool<Block = Block, Hash = TransactionHash> + 'static,
TP: LocalTransactionPool<Block = Block>
+ TransactionPool<Block = Block, Hash = TransactionHash>
+ 'static,
{
let AlephConfig {
authentication_network,
Expand Down Expand Up @@ -132,8 +134,10 @@ where
}
});

let runtime_api = RuntimeApiImpl::new(client.clone(), transaction_pool.clone());

let map_updater = SessionMapUpdater::new(
AuthorityProviderImpl::new(client.clone(), RuntimeApiImpl::new(client.clone())),
AuthorityProviderImpl::new(client.clone(), runtime_api.clone()),
FinalityNotifierImpl::new(client.clone()),
session_period,
);
Expand Down Expand Up @@ -178,7 +182,7 @@ where
);

let session_authority_provider =
AuthorityProviderImpl::new(client.clone(), RuntimeApiImpl::new(client.clone()));
AuthorityProviderImpl::new(client.clone(), runtime_api.clone());
let verifier = VerifierCache::new(
session_info.clone(),
SubstrateFinalizationInfo::new(client.clone()),
Expand Down Expand Up @@ -225,7 +229,7 @@ where
ValidatorIndexToAccountIdConverterImpl::new(
client.clone(),
session_info.clone(),
RuntimeApiImpl::new(client.clone()),
runtime_api.clone(),
),
);

Expand Down Expand Up @@ -271,6 +275,7 @@ where
spawn_handle,
connection_manager,
keystore,
runtime_api,
score_metrics,
),
session_info,
Expand Down
Loading

0 comments on commit 75ef572

Please sign in to comment.