From 75ef57221d3656c7ef096d9b9638cfabd416b40b Mon Sep 17 00:00:00 2001 From: timorleph <145755355+timorleph@users.noreply.github.com> Date: Wed, 15 Jan 2025 11:23:59 +0100 Subject: [PATCH] A0-4346: Gather signatures and actually submit scores to chain (#1907) # Description Send performance scores to the aggregator for gathering multisignatures under them and then submits them on chain. This should finish the ABFT performance project on the node side, modulo testing. ## Type of change - New feature (non-breaking change which adds functionality) # Checklist: - I have created new documentation --- finality-aleph/src/abft/current/mod.rs | 2 +- .../src/abft/current/performance/mod.rs | 2 +- .../src/abft/current/performance/service.rs | 94 +++++++++++++++++-- finality-aleph/src/abft/mod.rs | 2 +- finality-aleph/src/nodes.rs | 15 ++- .../src/party/manager/aggregator.rs | 74 ++++++++++----- finality-aleph/src/party/manager/mod.rs | 39 ++++++-- finality-aleph/src/runtime_api.rs | 77 ++++++++++++--- primitives/src/lib.rs | 5 +- scripts/run_nodes.sh | 3 + 10 files changed, 255 insertions(+), 58 deletions(-) diff --git a/finality-aleph/src/abft/current/mod.rs b/finality-aleph/src/abft/current/mod.rs index 3544bbd450..d7db69ecf7 100644 --- a/finality-aleph/src/abft/current/mod.rs +++ b/finality-aleph/src/abft/current/mod.rs @@ -9,7 +9,7 @@ mod performance; mod traits; pub use network::NetworkData; -pub use performance::Service as PerformanceService; +pub use performance::{Service as PerformanceService, ServiceIO as PerformanceServiceIO}; pub use crate::aleph_primitives::CURRENT_FINALITY_VERSION as VERSION; use crate::{ diff --git a/finality-aleph/src/abft/current/performance/mod.rs b/finality-aleph/src/abft/current/performance/mod.rs index 5cc4e4175e..23cde654cb 100644 --- a/finality-aleph/src/abft/current/performance/mod.rs +++ b/finality-aleph/src/abft/current/performance/mod.rs @@ -3,6 +3,6 @@ use crate::{data_io::AlephData, Hasher}; mod scorer; mod service; -pub use service::Service; +pub use service::{Service, ServiceIO}; type Batch = Vec, Hasher>>; diff --git a/finality-aleph/src/abft/current/performance/service.rs b/finality-aleph/src/abft/current/performance/service.rs index 06b2db2abb..1dedcb4713 100644 --- a/finality-aleph/src/abft/current/performance/service.rs +++ b/finality-aleph/src/abft/current/performance/service.rs @@ -1,21 +1,31 @@ +use std::collections::HashMap; + use current_aleph_bft::NodeCount; use futures::{ channel::{mpsc, oneshot}, StreamExt, }; use log::{debug, error, warn}; +use parity_scale_codec::Encode; +use sp_runtime::traits::Hash as _; use crate::{ abft::{ current::performance::{scorer::Scorer, Batch}, LOG_TARGET, }, + aleph_primitives::{ + crypto::SignatureSet, AuthoritySignature, Hash, Hashing, RawScore, Score, ScoreNonce, + }, data_io::AlephData, metrics::ScoreMetrics, party::manager::Runnable, - Hasher, UnverifiedHeader, + runtime_api::RuntimeApi, + Hasher, SessionId, UnverifiedHeader, }; +const SCORE_SUBMISSION_PERIOD: usize = 300; + struct FinalizationWrapper where UH: UnverifiedHeader, @@ -59,26 +69,43 @@ where } /// A service computing the performance score of ABFT nodes based on batches of ordered units. -pub struct Service +pub struct Service where UH: UnverifiedHeader, + RA: RuntimeApi, { my_index: usize, + session_id: SessionId, batches_from_abft: mpsc::UnboundedReceiver>, + hashes_for_aggregator: mpsc::UnboundedSender, + signatures_from_aggregator: mpsc::UnboundedReceiver<(Hash, SignatureSet)>, + runtime_api: RA, + pending_scores: HashMap, + nonce: ScoreNonce, scorer: Scorer, metrics: ScoreMetrics, } -impl Service +pub struct ServiceIO { + pub hashes_for_aggregator: mpsc::UnboundedSender, + pub signatures_from_aggregator: + mpsc::UnboundedReceiver<(Hash, SignatureSet)>, +} + +impl Service where UH: UnverifiedHeader, + RA: RuntimeApi, { /// Create a new service, together with a unit finalization handler that should be passed to /// ABFT. It will wrap the provided finalization handler and call it in the background. pub fn new( my_index: usize, n_members: usize, + session_id: SessionId, finalization_handler: FH, + io: ServiceIO, + runtime_api: RA, metrics: ScoreMetrics, ) -> ( Self, @@ -87,38 +114,89 @@ where where FH: current_aleph_bft::FinalizationHandler>, { + let ServiceIO { + hashes_for_aggregator, + signatures_from_aggregator, + } = io; let (batches_for_us, batches_from_abft) = mpsc::unbounded(); ( Service { my_index, + session_id, batches_from_abft, + hashes_for_aggregator, + signatures_from_aggregator, + runtime_api, + pending_scores: HashMap::new(), + nonce: 1, scorer: Scorer::new(NodeCount(n_members)), metrics, }, FinalizationWrapper::new(finalization_handler, batches_for_us), ) } + + fn make_score(&mut self, points: RawScore) -> Score { + let result = Score { + session_id: self.session_id.0, + nonce: self.nonce, + points, + }; + self.nonce += 1; + result + } } #[async_trait::async_trait] -impl Runnable for Service +impl Runnable for Service where UH: UnverifiedHeader, + RA: RuntimeApi, { async fn run(mut self, mut exit: oneshot::Receiver<()>) { + let mut batch_counter = 1; loop { tokio::select! { maybe_batch = self.batches_from_abft.next() => { - let score = match maybe_batch { + let points = match maybe_batch { Some(batch) => self.scorer.process_batch(batch), None => { error!(target: LOG_TARGET, "Batches' channel closed, ABFT performance scoring terminating."); break; }, }; - debug!(target: LOG_TARGET, "Received ABFT score: {:?}.", score); - self.metrics.report_score(score[self.my_index]); - // TODO(A0-4339): sometimes submit these scores to the chain. + self.metrics.report_score(points[self.my_index]); + if batch_counter % SCORE_SUBMISSION_PERIOD == 0 { + let score = self.make_score(points); + let score_hash = Hashing::hash_of(&score.encode()); + debug!(target: LOG_TARGET, "Gathering signature under ABFT score: {:?}.", score); + self.pending_scores.insert(score_hash, score); + if let Err(e) = self.hashes_for_aggregator.unbounded_send(score_hash) { + error!(target: LOG_TARGET, "Failed to send score hash to signature aggregation: {}.", e); + break; + } + } + batch_counter += 1; + } + maybe_signed = self.signatures_from_aggregator.next() => { + match maybe_signed { + Some((hash, signature)) => { + match self.pending_scores.remove(&hash) { + Some(score) => { + if let Err(e) = self.runtime_api.submit_abft_score(score, signature) { + warn!(target: LOG_TARGET, "Failed to submit performance score to chain: {}.", e); + } + }, + None => { + warn!(target: LOG_TARGET, "Received multisigned hash for unknown performance score, this shouldn't ever happen."); + }, + } + }, + None => { + error!(target: LOG_TARGET, "Signatures' channel closed, ABFT performance scoring terminating."); + break; + }, + } } _ = &mut exit => { debug!(target: LOG_TARGET, "ABFT performance scoring task received exit signal. Terminating."); diff --git a/finality-aleph/src/abft/mod.rs b/finality-aleph/src/abft/mod.rs index 581a96a901..8786ea1f51 100644 --- a/finality-aleph/src/abft/mod.rs +++ b/finality-aleph/src/abft/mod.rs @@ -21,7 +21,7 @@ pub use crypto::Keychain; pub use current::{ create_aleph_config as current_create_aleph_config, run_member as run_current_member, NetworkData as CurrentNetworkData, PerformanceService as CurrentPerformanceService, - VERSION as CURRENT_VERSION, + PerformanceServiceIO as CurrentPerformanceServiceIO, VERSION as CURRENT_VERSION, }; pub use legacy::{ create_aleph_config as legacy_create_aleph_config, run_member as run_legacy_member, diff --git a/finality-aleph/src/nodes.rs b/finality-aleph/src/nodes.rs index 92b06e70b1..eef8361be2 100644 --- a/finality-aleph/src/nodes.rs +++ b/finality-aleph/src/nodes.rs @@ -9,7 +9,7 @@ use primitives::TransactionHash; use rate_limiter::SharedRateLimiter; use sc_client_api::Backend; use sc_keystore::{Keystore, LocalKeystore}; -use sc_transaction_pool_api::TransactionPool; +use sc_transaction_pool_api::{LocalTransactionPool, TransactionPool}; use sp_consensus_aura::AuraApi; use crate::{ @@ -60,7 +60,9 @@ where C: crate::ClientForAleph + Send + Sync + 'static, C::Api: AlephSessionApi + AuraApi, BE: Backend + 'static, - TP: TransactionPool + 'static, + TP: LocalTransactionPool + + TransactionPool + + 'static, { let AlephConfig { authentication_network, @@ -132,8 +134,10 @@ where } }); + let runtime_api = RuntimeApiImpl::new(client.clone(), transaction_pool.clone()); + let map_updater = SessionMapUpdater::new( - AuthorityProviderImpl::new(client.clone(), RuntimeApiImpl::new(client.clone())), + AuthorityProviderImpl::new(client.clone(), runtime_api.clone()), FinalityNotifierImpl::new(client.clone()), session_period, ); @@ -178,7 +182,7 @@ where ); let session_authority_provider = - AuthorityProviderImpl::new(client.clone(), RuntimeApiImpl::new(client.clone())); + AuthorityProviderImpl::new(client.clone(), runtime_api.clone()); let verifier = VerifierCache::new( session_info.clone(), SubstrateFinalizationInfo::new(client.clone()), @@ -225,7 +229,7 @@ where ValidatorIndexToAccountIdConverterImpl::new( client.clone(), session_info.clone(), - RuntimeApiImpl::new(client.clone()), + runtime_api.clone(), ), ); @@ -271,6 +275,7 @@ where spawn_handle, connection_manager, keystore, + runtime_api, score_metrics, ), session_info, diff --git a/finality-aleph/src/party/manager/aggregator.rs b/finality-aleph/src/party/manager/aggregator.rs index c277191b7f..f1bacd066f 100644 --- a/finality-aleph/src/party/manager/aggregator.rs +++ b/finality-aleph/src/party/manager/aggregator.rs @@ -12,7 +12,9 @@ use tokio::time; use crate::{ abft::SignatureSet, aggregation::{Aggregator, SignableTypedHash}, - aleph_primitives::BlockHash, + aleph_primitives::{ + crypto::SignatureSet as PrimitivesSignatureSet, AuthoritySignature, BlockHash, Hash, + }, block::{ substrate::{Justification, JustificationTranslator}, Header, HeaderBackend, @@ -23,7 +25,7 @@ use crate::{ network::data::Network, party::{ manager::aggregator::AggregatorVersion::{Current, Legacy}, - AuthoritySubtaskCommon, Task, + AuthoritySubtaskCommon, Task, LOG_TARGET, }, sync::JustificationSubmissions, BlockId, CurrentRmcNetworkData, Keychain, LegacyRmcNetworkData, SessionBoundaries, @@ -34,15 +36,20 @@ use crate::{ pub enum Error { MultisignaturesStreamTerminated, UnableToProcessHash, + UnableToSendSignedPerformance, } impl Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use Error::*; match self { - Error::MultisignaturesStreamTerminated => { - write!(f, "The stream of multisigned hashes has ended.") + MultisignaturesStreamTerminated => { + write!(f, "the stream of multisigned hashes has ended") + } + UnableToProcessHash => write!(f, "error while processing a block hash"), + UnableToSendSignedPerformance => { + write!(f, "failed to send a signed performance hash to the scorer") } - Error::UnableToProcessHash => write!(f, "Error while processing a hash."), } } } @@ -55,6 +62,9 @@ where pub blocks_from_interpreter: mpsc::UnboundedReceiver, pub justifications_for_chain: JS, pub justification_translator: JustificationTranslator, + pub performance_from_scorer: mpsc::UnboundedReceiver, + pub signed_performance_for_scorer: + mpsc::UnboundedSender<(Hash, PrimitivesSignatureSet)>, } async fn process_new_block_data( @@ -65,7 +75,7 @@ async fn process_new_block_data( CN: Network, LN: Network, { - trace!(target: "aleph-party", "Received unit {:?} in aggregator.", block); + trace!(target: LOG_TARGET, "Received unit {:?} in aggregator.", block); let hash = block.hash(); metrics.report_block(hash, Checkpoint::Ordered); aggregator @@ -93,12 +103,12 @@ where ) { Ok(justification) => justification, Err(e) => { - error!(target: "aleph-party", "Issue with translating justification from Aggregator to Sync Justification: {}.", e); + error!(target: LOG_TARGET, "Issue with translating justification from Aggregator to Sync Justification: {}.", e); return Err(()); } }; if let Err(e) = justifications_for_chain.submit(justification) { - error!(target: "aleph-party", "Issue with sending justification from Aggregator to JustificationHandler {}.", e); + error!(target: LOG_TARGET, "Issue with sending justification from Aggregator to JustificationHandler {}.", e); return Err(()); } Ok(()) @@ -124,27 +134,31 @@ where blocks_from_interpreter, mut justifications_for_chain, justification_translator, + performance_from_scorer, + signed_performance_for_scorer, } = io; let blocks_from_interpreter = blocks_from_interpreter.take_while(|block| { let block_num = block.number(); async move { if block_num == session_boundaries.last_block() { - debug!(target: "aleph-party", "Aggregator is processing last block in session."); + debug!(target: LOG_TARGET, "Aggregator is processing last block in session."); } block_num <= session_boundaries.last_block() } }); pin_mut!(blocks_from_interpreter); + pin_mut!(performance_from_scorer); let mut hash_of_last_block = None; - let mut no_more_blocks = blocks_from_interpreter.is_terminated(); + let mut session_over = blocks_from_interpreter.is_terminated(); + let mut no_more_performance = performance_from_scorer.is_terminated(); let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL); loop { - trace!(target: "aleph-party", "Aggregator Loop started a next iteration"); + trace!(target: LOG_TARGET, "Aggregator Loop started a next iteration"); tokio::select! { - maybe_block = blocks_from_interpreter.next(), if !no_more_blocks => match maybe_block { + maybe_block = blocks_from_interpreter.next(), if !session_over => match maybe_block { Some(block) => { hash_of_last_block = Some(block.hash()); process_new_block_data::( @@ -154,8 +168,19 @@ where ).await; }, None => { - debug!(target: "aleph-party", "Blocks ended in aggregator."); - no_more_blocks = true; + debug!(target: LOG_TARGET, "Blocks ended in aggregator."); + session_over = true; + }, + }, + maybe_performance_hash = performance_from_scorer.next(), if !no_more_performance && !session_over => match maybe_performance_hash { + Some(hash) => { + aggregator + .start_aggregation(SignableTypedHash::Performance(hash)) + .await; + }, + None => { + debug!(target: LOG_TARGET, "Performance hashes ended in aggregator."); + no_more_performance = true; }, }, multisigned_hash = aggregator.next_multisigned_hash() => { @@ -167,23 +192,28 @@ where hash_of_last_block = None; } }, - Performance(_) => unimplemented!("we don't gather multisignatures under performance reports yet"), + Performance(hash) => { + if let Err(e) = signed_performance_for_scorer.unbounded_send((hash, multisignature.into())) { + error!(target: LOG_TARGET, "Issue with sending signed performance hash from Aggregator to Scorer {}.", e); + return Err(Error::UnableToSendSignedPerformance); + } + } } }, _ = status_ticker.tick() => { aggregator.status_report(); }, _ = &mut exit_rx => { - debug!(target: "aleph-party", "Aggregator received exit signal. Terminating."); + debug!(target: LOG_TARGET, "Aggregator received exit signal. Terminating."); break; } } - if hash_of_last_block.is_none() && no_more_blocks { - debug!(target: "aleph-party", "Aggregator processed all provided blocks. Terminating."); + if hash_of_last_block.is_none() && session_over { + debug!(target: LOG_TARGET, "Aggregator processed all provided blocks. Terminating."); break; } } - debug!(target: "aleph-party", "Aggregator finished its work."); + debug!(target: LOG_TARGET, "Aggregator finished its work."); Ok(()) } @@ -220,7 +250,7 @@ where Current(rmc_network) => Aggregator::new_current(&multikeychain, rmc_network), Legacy(rmc_network) => Aggregator::new_legacy(&multikeychain, rmc_network), }; - debug!(target: "aleph-party", "Running the aggregator task for {:?}", session_id); + debug!(target: LOG_TARGET, "Running the aggregator task for {:?}", session_id); let result = run_aggregator( aggregator_io, io, @@ -233,11 +263,11 @@ where let result = match result { Ok(_) => Ok(()), Err(err) => { - error!(target: "aleph-party", "Aggregator exited with error: {err}"); + error!(target: LOG_TARGET, "Aggregator exited with error: {err}"); Err(()) } }; - debug!(target: "aleph-party", "Aggregator task stopped for {:?}", session_id); + debug!(target: LOG_TARGET, "Aggregator task stopped for {:?}", session_id); result } }; diff --git a/finality-aleph/src/party/manager/mod.rs b/finality-aleph/src/party/manager/mod.rs index 2cc552348f..7f48b10501 100644 --- a/finality-aleph/src/party/manager/mod.rs +++ b/finality-aleph/src/party/manager/mod.rs @@ -12,9 +12,11 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use crate::{ abft::{ current_create_aleph_config, legacy_create_aleph_config, run_current_member, - run_legacy_member, CurrentPerformanceService, SpawnHandle, + run_legacy_member, CurrentPerformanceService, CurrentPerformanceServiceIO, SpawnHandle, + }, + aleph_primitives::{ + crypto::SignatureSet, AuthoritySignature, BlockHash, BlockNumber, Hash, KEY_TYPE, }, - aleph_primitives::{BlockHash, BlockNumber, KEY_TYPE}, block::{ substrate::{Justification, JustificationTranslator}, BestBlockSelector, Block, Header, HeaderVerifier, UnverifiedHeader, @@ -34,6 +36,7 @@ use crate::{ backup::ABFTBackup, manager::aggregator::AggregatorVersion, traits::NodeSessionManager, LOG_TARGET, }, + runtime_api::RuntimeApi, sync::JustificationSubmissions, AuthorityId, BlockId, CurrentRmcNetworkData, Keychain, LegacyRmcNetworkData, NodeIndex, ProvideRuntimeApi, SessionBoundaries, SessionBoundaryInfo, SessionId, SessionPeriod, @@ -81,6 +84,9 @@ where session_boundaries: SessionBoundaries, subtask_common: TaskCommon, blocks_for_aggregator: mpsc::UnboundedSender, + performance_for_aggregator: mpsc::UnboundedSender, + signed_performance_from_aggregator: + mpsc::UnboundedReceiver<(Hash, SignatureSet)>, chain_info: SubstrateChainInfoProvider, aggregator_io: aggregator::IO, multikeychain: Keychain, @@ -88,7 +94,7 @@ where backup: ABFTBackup, } -pub struct NodeSessionManagerImpl +pub struct NodeSessionManagerImpl where H: Header, B: Block + BlockT, @@ -101,6 +107,7 @@ where SM: SessionManager> + 'static, JS: JustificationSubmissions + Send + Sync + Clone, V: HeaderVerifier, + RA: RuntimeApi, { client: Arc, header_backend: HB, @@ -115,11 +122,13 @@ where spawn_handle: SpawnHandle, session_manager: SM, keystore: Arc, + runtime_api: RA, score_metrics: ScoreMetrics, _phantom: PhantomData<(B, H)>, } -impl NodeSessionManagerImpl +impl + NodeSessionManagerImpl where H: Header, B: Block + BlockT, @@ -132,6 +141,7 @@ where SM: SessionManager> + 'static, JS: JustificationSubmissions + Send + Sync + Clone, V: HeaderVerifier, + RA: RuntimeApi, { #[allow(clippy::too_many_arguments)] pub fn new( @@ -148,6 +158,7 @@ where spawn_handle: SpawnHandle, session_manager: SM, keystore: Arc, + runtime_api: RA, score_metrics: ScoreMetrics, ) -> Self { Self { @@ -164,6 +175,7 @@ where spawn_handle, session_manager, keystore, + runtime_api, score_metrics, _phantom: PhantomData, } @@ -258,6 +270,8 @@ where session_boundaries, subtask_common, blocks_for_aggregator, + performance_for_aggregator, + signed_performance_from_aggregator, chain_info, aggregator_io, multikeychain, @@ -281,7 +295,13 @@ where let (abft_performance, abft_batch_handler) = CurrentPerformanceService::new( node_id.into(), n_members, + session_id, ordered_data_interpreter, + CurrentPerformanceServiceIO { + hashes_for_aggregator: performance_for_aggregator, + signatures_from_aggregator: signed_performance_from_aggregator, + }, + self.runtime_api.clone(), self.score_metrics.clone(), ); let consensus_config = @@ -351,10 +371,14 @@ where spawn_handle: self.spawn_handle.clone(), session_id: session_id.0, }; + let (performance_for_aggregator, performance_from_scorer) = mpsc::unbounded(); + let (signed_performance_for_scorer, signed_performance_from_aggregator) = mpsc::unbounded(); let aggregator_io = aggregator::IO { blocks_from_interpreter, justifications_for_chain: self.justifications_for_sync.clone(), justification_translator: self.justification_translator.clone(), + performance_from_scorer, + signed_performance_for_scorer, }; let data_network = match self @@ -383,6 +407,8 @@ where session_boundaries, subtask_common, blocks_for_aggregator, + performance_for_aggregator, + signed_performance_from_aggregator, chain_info, aggregator_io, multikeychain, @@ -436,8 +462,8 @@ where } #[async_trait] -impl NodeSessionManager - for NodeSessionManagerImpl +impl NodeSessionManager + for NodeSessionManagerImpl where H: Header, B: Block + BlockT, @@ -450,6 +476,7 @@ where SM: SessionManager> + 'static, JS: JustificationSubmissions + Send + Sync + Clone, V: HeaderVerifier, + RA: RuntimeApi, { type Error = SM::Error; diff --git a/finality-aleph/src/runtime_api.rs b/finality-aleph/src/runtime_api.rs index b9b0c23639..594088453e 100644 --- a/finality-aleph/src/runtime_api.rs +++ b/finality-aleph/src/runtime_api.rs @@ -8,21 +8,31 @@ use frame_support::StorageHasher; use pallet_aleph_runtime_api::AlephSessionApi; use parity_scale_codec::{Decode, DecodeAll, Encode, Error as DecodeError}; use sc_client_api::Backend; +use sc_transaction_pool_api::{LocalTransactionPool, OffchainTransactionPoolFactory}; +use sp_api::ApiExt; use sp_application_crypto::key_types::AURA; use sp_core::twox_128; use sp_runtime::traits::{Block, OpaqueKeys}; use crate::{ - aleph_primitives::{AccountId, AuraId}, + aleph_primitives::{crypto::SignatureSet, AccountId, AuraId, AuthoritySignature, Score}, BlockHash, ClientForAleph, }; /// Trait handling connection between host code and runtime storage pub trait RuntimeApi: Clone + Send + Sync + 'static { type Error: Display; + /// Returns aura authorities for the next session using state from block `at` fn next_aura_authorities(&self, at: BlockHash) -> Result, Self::Error>; + + /// Submits a signed ABFT performance score. + fn submit_abft_score( + &self, + score: Score, + signature: SignatureSet, + ) -> Result<(), Self::Error>; } pub struct RuntimeApiImpl @@ -33,7 +43,8 @@ where BE: Backend + 'static, { client: Arc, - _phantom: PhantomData<(B, BE)>, + transaction_pool_factory: OffchainTransactionPoolFactory, + _phantom: PhantomData, } impl Clone for RuntimeApiImpl @@ -44,7 +55,16 @@ where BE: Backend + 'static, { fn clone(&self) -> Self { - RuntimeApiImpl::new(self.client.clone()) + let RuntimeApiImpl { + client, + transaction_pool_factory, + _phantom, + } = self; + RuntimeApiImpl { + client: client.clone(), + transaction_pool_factory: transaction_pool_factory.clone(), + _phantom: *_phantom, + } } } @@ -55,9 +75,14 @@ where B: Block, BE: Backend + 'static, { - pub fn new(client: Arc) -> Self { + pub fn new + 'static>( + client: Arc, + transaction_pool: TP, + ) -> Self { + let transaction_pool_factory = OffchainTransactionPoolFactory::new(transaction_pool); Self { client, + transaction_pool_factory, _phantom: PhantomData, } } @@ -115,22 +140,27 @@ pub enum ApiError { NoStorageMapEntry(String, String), NoStorageValue(String, String), DecodeError(DecodeError), + ScoreSubmissionFailure, + CallFailed, } impl Display for ApiError { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { + use ApiError::*; match self { - ApiError::StorageAccessFailure => { + StorageAccessFailure => { write!(f, "blockchain error during a storage read attempt") } - ApiError::NoStorage => write!(f, "no storage found"), - ApiError::NoStorageMapEntry(pallet, item) => { + NoStorage => write!(f, "no storage found"), + NoStorageMapEntry(pallet, item) => { write!(f, "storage map element not found under {}{}", pallet, item) } - ApiError::NoStorageValue(pallet, item) => { + NoStorageValue(pallet, item) => { write!(f, "storage value not found under {}{}", pallet, item) } - ApiError::DecodeError(error) => write!(f, "decode error: {:?}", error), + DecodeError(error) => write!(f, "decode error: {:?}", error), + ScoreSubmissionFailure => write!(f, "failed to submit ABFT score"), + CallFailed => write!(f, "a call to the runtime failed"), } } } @@ -160,6 +190,26 @@ where .filter_map(|(account_id, keys)| keys.get(AURA).map(|key| (account_id, key))) .collect()) } + + fn submit_abft_score( + &self, + score: Score, + signature: SignatureSet, + ) -> Result<(), Self::Error> { + // Use top finalized as base for this submission. + let block_hash = self.client.info().finalized_hash; + let mut runtime_api = self.client.runtime_api(); + runtime_api.register_extension( + self.transaction_pool_factory + .offchain_transaction_pool(block_hash), + ); + + match runtime_api.submit_abft_score(block_hash, score, signature) { + Ok(Some(())) => Ok(()), + Ok(None) => Err(ApiError::ScoreSubmissionFailure), + Err(_) => Err(ApiError::CallFailed), + } + } } #[cfg(test)] @@ -172,6 +222,7 @@ mod test { use frame_support::Twox64Concat; use parity_scale_codec::Encode; use primitives::Hash; + use sc_transaction_pool_api::RejectAllTxPool; use sp_runtime::Storage; use substrate_test_client::ClientExt; @@ -206,7 +257,7 @@ mod test { *client_builder.genesis_init_mut().extra_storage() = storage; let client = Arc::new(client_builder.build()); let genesis_hash = client.genesis_hash(); - let runtime_api = RuntimeApiImpl::new(client); + let runtime_api = RuntimeApiImpl::new(client, RejectAllTxPool::default()); let map_value1 = runtime_api.read_storage_map::( "Pallet", @@ -245,7 +296,7 @@ mod test { *client_builder.genesis_init_mut().extra_storage() = storage; let client = Arc::new(client_builder.build()); let genesis_hash = client.genesis_hash(); - let runtime_api = RuntimeApiImpl::new(client); + let runtime_api = RuntimeApiImpl::new(client, RejectAllTxPool::default()); let result1 = runtime_api.read_storage_map::( "Pallet", @@ -290,7 +341,7 @@ mod test { *client_builder.genesis_init_mut().extra_storage() = storage; let client = Arc::new(client_builder.build()); let genesis_hash = client.genesis_hash(); - let runtime_api = RuntimeApiImpl::new(client); + let runtime_api = RuntimeApiImpl::new(client, RejectAllTxPool::default()); // parameterize function with String instead of u32 let result1 = runtime_api.read_storage_map::( @@ -327,7 +378,7 @@ mod test { let mut client_builder = TestClientBuilder::new(); *client_builder.genesis_init_mut().extra_storage() = storage; let client = Arc::new(client_builder.build()); - let runtime_api = RuntimeApiImpl::new(client); + let runtime_api = RuntimeApiImpl::new(client, RejectAllTxPool::default()); let result1 = runtime_api.read_storage_map::( "Pallet", diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index 59a0627d75..eb42aedcd9 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -65,6 +65,9 @@ pub type AccountId = ::AccountId; /// never know... pub type AccountIndex = u32; +/// The hashing algorithm we use for everything. +pub type Hashing = BlakeTwo256; + /// A hash of some data used by the chain. pub type Hash = sp_core::H256; @@ -75,7 +78,7 @@ pub type Nonce = u32; pub type Balance = u128; /// Header type. -pub type Header = generic::Header; +pub type Header = generic::Header; /// Block type. pub type Block = generic::Block; diff --git a/scripts/run_nodes.sh b/scripts/run_nodes.sh index 30cd64ec1e..2d5972440a 100755 --- a/scripts/run_nodes.sh +++ b/scripts/run_nodes.sh @@ -44,6 +44,7 @@ CHAINSPEC_GENERATOR="target/release/chain-bootstrapper" NODE_P2P_PORT_RANGE_START=30333 NODE_VALIDATOR_PORT_RANGE_START=30343 NODE_RPC_PORT_RANGE_START=9944 +PROMETHEUS_PORT_RANGE_START=9615 # ------------------------ argument parsing and usage ----------------------- @@ -174,6 +175,7 @@ function run_node() { --name "${node_name}" --rpc-port $((NODE_RPC_PORT_RANGE_START + index)) --port $((NODE_P2P_PORT_RANGE_START + index)) + --prometheus-port $((PROMETHEUS_PORT_RANGE_START + index)) --validator-port "${validator_port}" --node-key-file "${BASE_PATH}/${account_id}/p2p_secret" --backup-path "${BASE_PATH}/${account_id}/backup-stash" @@ -193,6 +195,7 @@ function run_node() { -laleph-data-store=debug -laleph-updater=debug -laleph-metrics=debug + -laleph-abft=debug ) info "Running node ${index}..."