From 2d564a1bfef05c12624033a15d5edf473a6c5ed8 Mon Sep 17 00:00:00 2001 From: lorban Date: Tue, 19 Mar 2024 22:28:03 +0100 Subject: [PATCH] Error management Error management when some transactions are missing. The jds should not break. Instead, tries to recover it by triggering the jds mempool to retrieve the transactions from its bitcoin node --- .../src/lib/job_declarator/message_handler.rs | 21 +- roles/jd-server/src/lib/job_declarator/mod.rs | 316 ++++++++++++------ roles/jd-server/src/lib/mempool/mod.rs | 15 +- roles/jd-server/src/main.rs | 22 +- 4 files changed, 259 insertions(+), 115 deletions(-) diff --git a/roles/jd-server/src/lib/job_declarator/message_handler.rs b/roles/jd-server/src/lib/job_declarator/message_handler.rs index 8e038f1818..e883692b85 100644 --- a/roles/jd-server/src/lib/job_declarator/message_handler.rs +++ b/roles/jd-server/src/lib/job_declarator/message_handler.rs @@ -65,8 +65,7 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { // mempool which use the rpc client to retrieve the whole data for each transactions. // The unknown transactions is a vector that contains the transactions that are not in the // jds mempool, and will be non-empty in the ProvideMissingTransactionsSuccess message - let mut known_transactions: Vec = Vec::new(); - let unknown_transactions: Vec = Vec::new(); + let mut known_transactions: Vec = vec![]; self.tx_hash_list_hash = Some(message.tx_hash_list_hash.clone().into_static()); if self.verify_job(&message) { let short_hash_list: Vec = message @@ -92,7 +91,7 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { Some(tx_data) => { transactions_with_state[i] = TransactionState::PresentInMempool(tx_data.id); known_transactions.push(tx_data.id); - }, + } None => { transactions_with_state[i] = TransactionState::Missing; missing_txs.push(i as u16); @@ -105,7 +104,11 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { missing_txs.clone(), ); // here we send the transactions that we want to be stored in jds mempool with full data - self.sender_add_txs_to_mempool.send(super::AddTrasactionsToMempool { known_transactions, unknown_transactions}); + + self.add_txs_to_mempool + .add_txs_to_mempool_inner + .known_transactions + .append(&mut known_transactions); if missing_txs.is_empty() { let message_success = DeclareMiningJobSuccess { @@ -152,13 +155,12 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { message: ProvideMissingTransactionsSuccess, ) -> Result { let (_, ref mut transactions_with_state, missing_indexes) = &mut self.declared_mining_job; - let mut unknown_transactions: Vec = Vec::new(); - let known_transactions: Vec = Vec::new(); + let mut unknown_transactions: Vec = vec![]; for (i, tx) in message.transaction_list.inner_as_ref().iter().enumerate() { let mut cursor = Cursor::new(tx); let transaction = Transaction::consensus_decode_from_finite_reader(&mut cursor) .map_err(|e| Error::TxDecodingError(e.to_string()))?; - &unknown_transactions.push(transaction.clone()); + Vec::push(&mut unknown_transactions, transaction.clone()); let index = *missing_indexes .get(i) .ok_or(Error::LogicErrorMessage(Box::new( @@ -169,7 +171,10 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { // insert the missing transactions in the mempool transactions_with_state[index] = TransactionState::PresentInMempool(transaction.txid()); } - self.sender_add_txs_to_mempool.send(super::AddTrasactionsToMempool { known_transactions, unknown_transactions: unknown_transactions.clone()}); + self.add_txs_to_mempool + .add_txs_to_mempool_inner + .unknown_transactions + .append(&mut unknown_transactions); // if there still a missing transaction return an error for tx_with_state in transactions_with_state { match tx_with_state { diff --git a/roles/jd-server/src/lib/job_declarator/mod.rs b/roles/jd-server/src/lib/job_declarator/mod.rs index c7ed4387ee..4437df16c7 100644 --- a/roles/jd-server/src/lib/job_declarator/mod.rs +++ b/roles/jd-server/src/lib/job_declarator/mod.rs @@ -16,7 +16,10 @@ use roles_logic_sv2::{ }; use secp256k1::{Keypair, Message as SecpMessage, Secp256k1}; use std::{collections::HashMap, convert::TryInto, sync::Arc}; -use tokio::net::TcpListener; +use tokio::{ + net::TcpListener, + time::Duration, +}; use tracing::{error, info}; use stratum_common::bitcoin::{ @@ -31,11 +34,18 @@ pub enum TransactionState { } #[derive(Clone, Debug)] -pub struct AddTrasactionsToMempool { +pub struct AddTrasactionsToMempoolInner { pub known_transactions: Vec, pub unknown_transactions: Vec, } +// TODO implement send method that sends the inner via the sender +#[derive(Clone, Debug)] +pub struct AddTrasactionsToMempool { + pub add_txs_to_mempool_inner: AddTrasactionsToMempoolInner, + pub sender_add_txs_to_mempool: Sender, +} + #[derive(Debug)] pub struct JobDeclaratorDownstream { sender: Sender, @@ -56,7 +66,7 @@ pub struct JobDeclaratorDownstream { Vec, ), tx_hash_list_hash: Option>, - sender_add_txs_to_mempool: Sender, + add_txs_to_mempool: AddTrasactionsToMempool, } impl JobDeclaratorDownstream { @@ -65,12 +75,16 @@ impl JobDeclaratorDownstream { sender: Sender, config: &Configuration, mempool: Arc>, - sender_add_txs_to_mempool: Sender + sender_add_txs_to_mempool: Sender, ) -> Self { let mut coinbase_output = vec![]; // TODO: use next variables let token_to_job_map = HashMap::with_hasher(BuildNoHashHasher::default()); let tokens = Id::new(); + let add_txs_to_mempool_inner = AddTrasactionsToMempoolInner { + known_transactions: vec![], + unknown_transactions: vec![], + }; super::get_coinbase_output(config).expect("Invalid coinbase output in config")[0] .consensus_encode(&mut coinbase_output) .expect("Invalid coinbase output in config"); @@ -86,7 +100,10 @@ impl JobDeclaratorDownstream { mempool, declared_mining_job: (None, Vec::new(), Vec::new()), tx_hash_list_hash: None, - sender_add_txs_to_mempool, + add_txs_to_mempool: AddTrasactionsToMempool { + add_txs_to_mempool_inner, + sender_add_txs_to_mempool, + }, } } @@ -94,54 +111,83 @@ impl JobDeclaratorDownstream { self_mutex: Arc>, message: SubmitSolutionJd, ) -> Result> { - let (last_declare_, transactions_with_state, _) = self_mutex + let (last_declare_, _, _) = self_mutex + .clone() + .safe_lock(|x| x.declared_mining_job.clone()) + .map_err(|e| Box::new(JdsError::PoisonLock(e.to_string())))?; + let last_declare = last_declare_.ok_or(Box::new(JdsError::NoLastDeclaredJob))?; + let transactions_list = Self::are_all_job_transactions_present(self_mutex)?; + let block: Block = + roles_logic_sv2::utils::BlockCreator::new(last_declare, transactions_list, message) + .into(); + Ok(hex::encode(serialize(&block))) + } + + fn are_all_job_transactions_present( + self_mutex: Arc>, + ) -> Result, Box> { + let (_, transactions_with_state, _) = self_mutex + .clone() .safe_lock(|x| x.declared_mining_job.clone()) - .map_err(|e| JdsError::PoisonLock(e.to_string()))?; - let mempool_ = self_mutex + .map_err(|e| Box::new(JdsError::PoisonLock(e.to_string())))?; + let mempool = self_mutex .safe_lock(|x| x.mempool.clone()) - .map_err(|e| JdsError::PoisonLock(e.to_string()))?; - let last_declare = last_declare_.ok_or(JdsError::NoLastDeclaredJob)?; + .map_err(|e| Box::new(JdsError::PoisonLock(e.to_string())))?; let mut transactions_list: Vec = Vec::new(); for tx_with_state in transactions_with_state.iter().enumerate() { if let TransactionState::PresentInMempool(txid) = tx_with_state.1 { - let tx_ = match mempool_.safe_lock(|x| x.mempool.get(txid).cloned()) { - Ok(tx) => tx, - Err(e) => return Err(Box::new(JdsError::PoisonLock(e.to_string()))), - }; - let tx = tx_.ok_or(JdsError::ImpossibleToReconstructBlock( - "Missing transactions".to_string(), - )); + let tx = mempool + .safe_lock(|x| x.mempool.get(txid).cloned()) + .map_err(|e| JdsError::PoisonLock(e.to_string()))? + .ok_or(JdsError::ImpossibleToReconstructBlock( + "Txid not found in jds mempool".to_string(), + )); if let Ok(Some(tx)) = tx { transactions_list.push(tx); } else { return Err(Box::new(JdsError::ImpossibleToReconstructBlock( - "Missing transactions".to_string(), + "Txid found in jds mempool but transactions not present".to_string(), ))); } } else { return Err(Box::new(JdsError::ImpossibleToReconstructBlock( - "Missing transactions".to_string(), + "Unknown transaction".to_string(), ))); }; } - let block: Block = - roles_logic_sv2::utils::BlockCreator::new(last_declare, transactions_list, message) - .into(); - Ok(hex::encode(serialize(&block))) + Ok(transactions_list) } - fn are_all_job_transactions_present(self_mutex:Arc>) -> Result { - let transactions_ = self_mutex.safe_lock(|a| a.declared_mining_job.1.clone()).map_err(|e| JdsError::PoisonLock(e.to_string())); - let transactions = match transactions_ { - Ok(transactions_inner) => transactions_inner, - Err(error) => return Err(error), - }; - for transaction in transactions { - if let TransactionState::Missing = transaction { - return Ok(false); - } - }; - return Ok(true); + async fn send_txs_to_mempool(self_mutex: Arc>) { + let add_txs_to_mempool = self_mutex + .safe_lock(|a| a.add_txs_to_mempool.clone()) + .unwrap(); + let sender_add_txs_to_mempool = add_txs_to_mempool.sender_add_txs_to_mempool; + let add_txs_to_mempool_inner = add_txs_to_mempool.add_txs_to_mempool_inner; + let _ = sender_add_txs_to_mempool + .send(add_txs_to_mempool_inner) + .await; + // the trasnactions sent to the mempool can be freed + let _ = self_mutex.safe_lock(|a| { + a.add_txs_to_mempool.add_txs_to_mempool_inner = AddTrasactionsToMempoolInner { + known_transactions: vec![], + unknown_transactions: vec![], + }; + }); + } + + fn get_transactions_in_job(self_mutex: Arc>) -> Vec { + let mut known_transactions: Vec = Vec::new(); + let job_transactions = self_mutex + .safe_lock(|a| a.declared_mining_job.1.clone()) + .unwrap(); + for transaction in job_transactions { + match transaction { + TransactionState::PresentInMempool(txid) => known_transactions.push(txid), + TransactionState::Missing => continue, + }; + } + known_transactions } pub async fn send( @@ -177,8 +223,40 @@ impl JobDeclaratorDownstream { payload, ); match next_message_to_send { - Ok(SendTo::Respond(message)) => { - Self::send(self_mutex.clone(), message).await.unwrap(); + Ok(SendTo::Respond(m)) => { + match m { + JobDeclaration::AllocateMiningJobToken(_) => { + info!("Received AMJT") + } + JobDeclaration::AllocateMiningJobTokenSuccess(_) => { + error!("Unexpected message: AMJTS") + } + JobDeclaration::DeclareMiningJob(_) => { + info!("Received DMJ"); + Self::send_txs_to_mempool(self_mutex.clone()).await; + } + JobDeclaration::DeclareMiningJobError(_) => { + error!("Unexpected message: DMJE") + } + JobDeclaration::DeclareMiningJobSuccess(_) => { + error!("Unexpected message: DMJS") + } + JobDeclaration::IdentifyTransactions(_) => { + error!("Unexpected message: IT") + } + JobDeclaration::IdentifyTransactionsSuccess(_) => { + error!("Unexpected message: ITS") + } + JobDeclaration::ProvideMissingTransactions(_) => { + error!("Unexpected message: PMT") + } + JobDeclaration::ProvideMissingTransactionsSuccess(_) => { + info!("Received PMTS"); + Self::send_txs_to_mempool(self_mutex.clone()).await; + } + JobDeclaration::SubmitSolution(_) => todo!(), + } + Self::send(self_mutex.clone(), m).await.unwrap(); } Ok(SendTo::RelayNewMessage(message)) => { error!("JD Server: unexpected relay new message {:?}", message); @@ -192,64 +270,102 @@ impl JobDeclaratorDownstream { Ok(SendTo::Multiple(multiple)) => { error!("JD Server: unexpected multiple messages: {:?}", multiple); } - Ok(SendTo::None(m)) => match m { - Some(JobDeclaration::SubmitSolution(message)) => { - match JobDeclaratorDownstream::are_all_job_transactions_present(self_mutex.clone()) { - Ok(true_or_false) => if true_or_false { - info!("All transactions in downstream job are recognized correctly by the JD Server"); - } else { - // TODO print here the ip of the downstream - error!("Missing transactions at submit solution!"); - }, - Err(error) => handle_result!(tx_status, Err(error)), + Ok(SendTo::None(m)) => { + match m { + Some(JobDeclaration::SubmitSolution(message)) => { + match Self::are_all_job_transactions_present( + self_mutex.clone(), + ) { + Ok(_) => { + info!("All transactions in downstream job are recognized correctly by the JD Server"); + let hexdata = + match JobDeclaratorDownstream::get_block_hex( + self_mutex.clone(), + message, + ) { + Ok(inner) => inner, + Err(e) => { + error!( + "Received solution but encountered error: {:?}", + e + ); + recv.close(); + //TODO should we brake it? + break; + } + }; + let _ = new_block_sender.send(hexdata).await; + } + Err(error) => { + error!("Missing transactions: {:?}", error); + // TODO print here the ip of the downstream + let known_transactions = + JobDeclaratorDownstream::get_transactions_in_job( + self_mutex.clone(), + ); + let retrieve_transactions = + AddTrasactionsToMempoolInner { + known_transactions, + unknown_transactions: Vec::new(), + }; + let mempool = self_mutex + .clone() + .safe_lock(|a| a.mempool.clone()) + .unwrap(); + tokio::select! { + _ = JDsMempool::add_tx_data_to_mempool(mempool, retrieve_transactions) => { + let hexdata = match JobDeclaratorDownstream::get_block_hex( + self_mutex.clone(), + message.clone(), + ) { + Ok(inner) => inner, + Err(e) => { + error!( + "Error retrieving transactions: {:?}", + e + ); + recv.close(); + //TODO should we brake it? + break; + } + }; + let _ = new_block_sender.send(hexdata).await; + } + _ = tokio::time::sleep(Duration::from_secs(60)) => {} + }; + } + }; } - let hexdata = match JobDeclaratorDownstream::get_block_hex( - self_mutex.clone(), - message, - ) { - Ok(inner) => inner, - Err(e) => { - error!( - "Received solution but encountered error: {:?}", - e - ); - recv.close(); - //TODO should we brake it? - break; - } - }; - - let _ = new_block_sender.send(hexdata).await; - } - Some(JobDeclaration::DeclareMiningJob(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::DeclareMiningJobSuccess(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::DeclareMiningJobError(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::IdentifyTransactions(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::IdentifyTransactionsSuccess(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::AllocateMiningJobToken(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::AllocateMiningJobTokenSuccess(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::ProvideMissingTransactions(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::ProvideMissingTransactionsSuccess(_)) => { - error!("JD Server received an unexpected message {:?}", m); + Some(JobDeclaration::DeclareMiningJob(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::DeclareMiningJobSuccess(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::DeclareMiningJobError(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::IdentifyTransactions(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::IdentifyTransactionsSuccess(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::AllocateMiningJobToken(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::AllocateMiningJobTokenSuccess(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::ProvideMissingTransactions(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::ProvideMissingTransactionsSuccess(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + None => (), } - None => (), - }, + } Err(e) => { error!("{:?}", e); handle_result!( @@ -303,11 +419,19 @@ impl JobDeclarator { status_tx: crate::status::Sender, mempool: Arc>, new_block_sender: Sender, - sender_add_txs_to_mempool: Sender, + sender_add_txs_to_mempool: Sender, ) { let self_ = Arc::new(Mutex::new(Self {})); info!("JD INITIALIZED"); - Self::accept_incoming_connection(self_, config, status_tx, mempool, new_block_sender, sender_add_txs_to_mempool).await; + Self::accept_incoming_connection( + self_, + config, + status_tx, + mempool, + new_block_sender, + sender_add_txs_to_mempool, + ) + .await; } async fn accept_incoming_connection( _self_: Arc>, @@ -315,7 +439,7 @@ impl JobDeclarator { status_tx: crate::status::Sender, mempool: Arc>, new_block_sender: Sender, - sender_add_txs_to_mempool: Sender, + sender_add_txs_to_mempool: Sender, ) { let listner = TcpListener::bind(&config.listen_jd_address).await.unwrap(); while let Ok((stream, _)) = listner.accept().await { diff --git a/roles/jd-server/src/lib/mempool/mod.rs b/roles/jd-server/src/lib/mempool/mod.rs index a7db47d651..f176fb3e1f 100644 --- a/roles/jd-server/src/lib/mempool/mod.rs +++ b/roles/jd-server/src/lib/mempool/mod.rs @@ -1,6 +1,6 @@ pub mod error; +use super::job_declarator::AddTrasactionsToMempoolInner; use crate::mempool::error::JdsMempoolError; -use super::job_declarator::AddTrasactionsToMempool; use async_channel::Receiver; use bitcoin::blockdata::transaction::Transaction; use hashbrown::HashMap; @@ -62,20 +62,23 @@ impl JDsMempool { // this functions fill in the mempool the transactions with the given txid and insert the given // transactions. The ids are for the transactions that are already known to the node, the // unknown transactions are provided directly as a vector - pub async fn add_tx_data_to_mempool(self_: Arc>, add_transactions_to_mempool: AddTrasactionsToMempool) -> Result<(), JdsMempoolError>{ - let txids = add_transactions_to_mempool.known_transactions; - let transactions = add_transactions_to_mempool.unknown_transactions; + pub async fn add_tx_data_to_mempool( + self_: Arc>, + add_txs_to_mempool_inner: AddTrasactionsToMempoolInner, + ) -> Result<(), JdsMempoolError> { + let txids = add_txs_to_mempool_inner.known_transactions; + let transactions = add_txs_to_mempool_inner.unknown_transactions; let client = self_ .safe_lock(|a| a.get_client()) .map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))? .ok_or(JdsMempoolError::NoClient)?; - // fill in the mempool the transactions id in the mempool with the full transactions + // fill in the mempool the transactions id in the mempool with the full transactions // retrieved from the jd client for txid in txids { let transaction = client .get_raw_transaction(&txid.to_string(), None) .await - .map_err(|e| JdsMempoolError::Rpc(e))?; + .map_err(JdsMempoolError::Rpc)?; let _ = self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some(transaction))); } diff --git a/roles/jd-server/src/main.rs b/roles/jd-server/src/main.rs index 6d0530c35a..3391a93be8 100644 --- a/roles/jd-server/src/main.rs +++ b/roles/jd-server/src/main.rs @@ -191,22 +191,34 @@ async fn main() { let cloned = config.clone(); let mempool_cloned = mempool.clone(); - let (sender_jdd_for_mempool_update, receiver_jdd_for_mempool_update) = unbounded(); + let (sender_add_txs_to_mempool, receiver_add_txs_to_mempool) = unbounded(); task::spawn(async move { - JobDeclarator::start(cloned, sender, mempool_cloned, new_block_sender, sender_jdd_for_mempool_update).await + JobDeclarator::start( + cloned, + sender, + mempool_cloned, + new_block_sender, + sender_add_txs_to_mempool, + ) + .await }); task::spawn(async move { loop { - if let Ok(add_transactions_to_mempool) = receiver_jdd_for_mempool_update.recv().await { + if let Ok(add_transactions_to_mempool) = receiver_add_txs_to_mempool.recv().await { let mempool_cloned = mempool.clone(); task::spawn(async move { - match lib::mempool::JDsMempool::add_tx_data_to_mempool(mempool_cloned, add_transactions_to_mempool).await { + match lib::mempool::JDsMempool::add_tx_data_to_mempool( + mempool_cloned, + add_transactions_to_mempool, + ) + .await + { Ok(_) => (), Err(err) => { // TODO // here there should be a better error management mempool::error::handle_error(&err); - }, + } } }); }