diff --git a/roles/jd-server/src/lib/job_declarator/message_handler.rs b/roles/jd-server/src/lib/job_declarator/message_handler.rs index 8e038f1818..e883692b85 100644 --- a/roles/jd-server/src/lib/job_declarator/message_handler.rs +++ b/roles/jd-server/src/lib/job_declarator/message_handler.rs @@ -65,8 +65,7 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { // mempool which use the rpc client to retrieve the whole data for each transactions. // The unknown transactions is a vector that contains the transactions that are not in the // jds mempool, and will be non-empty in the ProvideMissingTransactionsSuccess message - let mut known_transactions: Vec = Vec::new(); - let unknown_transactions: Vec = Vec::new(); + let mut known_transactions: Vec = vec![]; self.tx_hash_list_hash = Some(message.tx_hash_list_hash.clone().into_static()); if self.verify_job(&message) { let short_hash_list: Vec = message @@ -92,7 +91,7 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { Some(tx_data) => { transactions_with_state[i] = TransactionState::PresentInMempool(tx_data.id); known_transactions.push(tx_data.id); - }, + } None => { transactions_with_state[i] = TransactionState::Missing; missing_txs.push(i as u16); @@ -105,7 +104,11 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { missing_txs.clone(), ); // here we send the transactions that we want to be stored in jds mempool with full data - self.sender_add_txs_to_mempool.send(super::AddTrasactionsToMempool { known_transactions, unknown_transactions}); + + self.add_txs_to_mempool + .add_txs_to_mempool_inner + .known_transactions + .append(&mut known_transactions); if missing_txs.is_empty() { let message_success = DeclareMiningJobSuccess { @@ -152,13 +155,12 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { message: ProvideMissingTransactionsSuccess, ) -> Result { let (_, ref mut transactions_with_state, missing_indexes) = &mut self.declared_mining_job; - let mut unknown_transactions: Vec = Vec::new(); - let known_transactions: Vec = Vec::new(); + let mut unknown_transactions: Vec = vec![]; for (i, tx) in message.transaction_list.inner_as_ref().iter().enumerate() { let mut cursor = Cursor::new(tx); let transaction = Transaction::consensus_decode_from_finite_reader(&mut cursor) .map_err(|e| Error::TxDecodingError(e.to_string()))?; - &unknown_transactions.push(transaction.clone()); + Vec::push(&mut unknown_transactions, transaction.clone()); let index = *missing_indexes .get(i) .ok_or(Error::LogicErrorMessage(Box::new( @@ -169,7 +171,10 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { // insert the missing transactions in the mempool transactions_with_state[index] = TransactionState::PresentInMempool(transaction.txid()); } - self.sender_add_txs_to_mempool.send(super::AddTrasactionsToMempool { known_transactions, unknown_transactions: unknown_transactions.clone()}); + self.add_txs_to_mempool + .add_txs_to_mempool_inner + .unknown_transactions + .append(&mut unknown_transactions); // if there still a missing transaction return an error for tx_with_state in transactions_with_state { match tx_with_state { diff --git a/roles/jd-server/src/lib/job_declarator/mod.rs b/roles/jd-server/src/lib/job_declarator/mod.rs index c7ed4387ee..4437df16c7 100644 --- a/roles/jd-server/src/lib/job_declarator/mod.rs +++ b/roles/jd-server/src/lib/job_declarator/mod.rs @@ -16,7 +16,10 @@ use roles_logic_sv2::{ }; use secp256k1::{Keypair, Message as SecpMessage, Secp256k1}; use std::{collections::HashMap, convert::TryInto, sync::Arc}; -use tokio::net::TcpListener; +use tokio::{ + net::TcpListener, + time::Duration, +}; use tracing::{error, info}; use stratum_common::bitcoin::{ @@ -31,11 +34,18 @@ pub enum TransactionState { } #[derive(Clone, Debug)] -pub struct AddTrasactionsToMempool { +pub struct AddTrasactionsToMempoolInner { pub known_transactions: Vec, pub unknown_transactions: Vec, } +// TODO implement send method that sends the inner via the sender +#[derive(Clone, Debug)] +pub struct AddTrasactionsToMempool { + pub add_txs_to_mempool_inner: AddTrasactionsToMempoolInner, + pub sender_add_txs_to_mempool: Sender, +} + #[derive(Debug)] pub struct JobDeclaratorDownstream { sender: Sender, @@ -56,7 +66,7 @@ pub struct JobDeclaratorDownstream { Vec, ), tx_hash_list_hash: Option>, - sender_add_txs_to_mempool: Sender, + add_txs_to_mempool: AddTrasactionsToMempool, } impl JobDeclaratorDownstream { @@ -65,12 +75,16 @@ impl JobDeclaratorDownstream { sender: Sender, config: &Configuration, mempool: Arc>, - sender_add_txs_to_mempool: Sender + sender_add_txs_to_mempool: Sender, ) -> Self { let mut coinbase_output = vec![]; // TODO: use next variables let token_to_job_map = HashMap::with_hasher(BuildNoHashHasher::default()); let tokens = Id::new(); + let add_txs_to_mempool_inner = AddTrasactionsToMempoolInner { + known_transactions: vec![], + unknown_transactions: vec![], + }; super::get_coinbase_output(config).expect("Invalid coinbase output in config")[0] .consensus_encode(&mut coinbase_output) .expect("Invalid coinbase output in config"); @@ -86,7 +100,10 @@ impl JobDeclaratorDownstream { mempool, declared_mining_job: (None, Vec::new(), Vec::new()), tx_hash_list_hash: None, - sender_add_txs_to_mempool, + add_txs_to_mempool: AddTrasactionsToMempool { + add_txs_to_mempool_inner, + sender_add_txs_to_mempool, + }, } } @@ -94,54 +111,83 @@ impl JobDeclaratorDownstream { self_mutex: Arc>, message: SubmitSolutionJd, ) -> Result> { - let (last_declare_, transactions_with_state, _) = self_mutex + let (last_declare_, _, _) = self_mutex + .clone() + .safe_lock(|x| x.declared_mining_job.clone()) + .map_err(|e| Box::new(JdsError::PoisonLock(e.to_string())))?; + let last_declare = last_declare_.ok_or(Box::new(JdsError::NoLastDeclaredJob))?; + let transactions_list = Self::are_all_job_transactions_present(self_mutex)?; + let block: Block = + roles_logic_sv2::utils::BlockCreator::new(last_declare, transactions_list, message) + .into(); + Ok(hex::encode(serialize(&block))) + } + + fn are_all_job_transactions_present( + self_mutex: Arc>, + ) -> Result, Box> { + let (_, transactions_with_state, _) = self_mutex + .clone() .safe_lock(|x| x.declared_mining_job.clone()) - .map_err(|e| JdsError::PoisonLock(e.to_string()))?; - let mempool_ = self_mutex + .map_err(|e| Box::new(JdsError::PoisonLock(e.to_string())))?; + let mempool = self_mutex .safe_lock(|x| x.mempool.clone()) - .map_err(|e| JdsError::PoisonLock(e.to_string()))?; - let last_declare = last_declare_.ok_or(JdsError::NoLastDeclaredJob)?; + .map_err(|e| Box::new(JdsError::PoisonLock(e.to_string())))?; let mut transactions_list: Vec = Vec::new(); for tx_with_state in transactions_with_state.iter().enumerate() { if let TransactionState::PresentInMempool(txid) = tx_with_state.1 { - let tx_ = match mempool_.safe_lock(|x| x.mempool.get(txid).cloned()) { - Ok(tx) => tx, - Err(e) => return Err(Box::new(JdsError::PoisonLock(e.to_string()))), - }; - let tx = tx_.ok_or(JdsError::ImpossibleToReconstructBlock( - "Missing transactions".to_string(), - )); + let tx = mempool + .safe_lock(|x| x.mempool.get(txid).cloned()) + .map_err(|e| JdsError::PoisonLock(e.to_string()))? + .ok_or(JdsError::ImpossibleToReconstructBlock( + "Txid not found in jds mempool".to_string(), + )); if let Ok(Some(tx)) = tx { transactions_list.push(tx); } else { return Err(Box::new(JdsError::ImpossibleToReconstructBlock( - "Missing transactions".to_string(), + "Txid found in jds mempool but transactions not present".to_string(), ))); } } else { return Err(Box::new(JdsError::ImpossibleToReconstructBlock( - "Missing transactions".to_string(), + "Unknown transaction".to_string(), ))); }; } - let block: Block = - roles_logic_sv2::utils::BlockCreator::new(last_declare, transactions_list, message) - .into(); - Ok(hex::encode(serialize(&block))) + Ok(transactions_list) } - fn are_all_job_transactions_present(self_mutex:Arc>) -> Result { - let transactions_ = self_mutex.safe_lock(|a| a.declared_mining_job.1.clone()).map_err(|e| JdsError::PoisonLock(e.to_string())); - let transactions = match transactions_ { - Ok(transactions_inner) => transactions_inner, - Err(error) => return Err(error), - }; - for transaction in transactions { - if let TransactionState::Missing = transaction { - return Ok(false); - } - }; - return Ok(true); + async fn send_txs_to_mempool(self_mutex: Arc>) { + let add_txs_to_mempool = self_mutex + .safe_lock(|a| a.add_txs_to_mempool.clone()) + .unwrap(); + let sender_add_txs_to_mempool = add_txs_to_mempool.sender_add_txs_to_mempool; + let add_txs_to_mempool_inner = add_txs_to_mempool.add_txs_to_mempool_inner; + let _ = sender_add_txs_to_mempool + .send(add_txs_to_mempool_inner) + .await; + // the trasnactions sent to the mempool can be freed + let _ = self_mutex.safe_lock(|a| { + a.add_txs_to_mempool.add_txs_to_mempool_inner = AddTrasactionsToMempoolInner { + known_transactions: vec![], + unknown_transactions: vec![], + }; + }); + } + + fn get_transactions_in_job(self_mutex: Arc>) -> Vec { + let mut known_transactions: Vec = Vec::new(); + let job_transactions = self_mutex + .safe_lock(|a| a.declared_mining_job.1.clone()) + .unwrap(); + for transaction in job_transactions { + match transaction { + TransactionState::PresentInMempool(txid) => known_transactions.push(txid), + TransactionState::Missing => continue, + }; + } + known_transactions } pub async fn send( @@ -177,8 +223,40 @@ impl JobDeclaratorDownstream { payload, ); match next_message_to_send { - Ok(SendTo::Respond(message)) => { - Self::send(self_mutex.clone(), message).await.unwrap(); + Ok(SendTo::Respond(m)) => { + match m { + JobDeclaration::AllocateMiningJobToken(_) => { + info!("Received AMJT") + } + JobDeclaration::AllocateMiningJobTokenSuccess(_) => { + error!("Unexpected message: AMJTS") + } + JobDeclaration::DeclareMiningJob(_) => { + info!("Received DMJ"); + Self::send_txs_to_mempool(self_mutex.clone()).await; + } + JobDeclaration::DeclareMiningJobError(_) => { + error!("Unexpected message: DMJE") + } + JobDeclaration::DeclareMiningJobSuccess(_) => { + error!("Unexpected message: DMJS") + } + JobDeclaration::IdentifyTransactions(_) => { + error!("Unexpected message: IT") + } + JobDeclaration::IdentifyTransactionsSuccess(_) => { + error!("Unexpected message: ITS") + } + JobDeclaration::ProvideMissingTransactions(_) => { + error!("Unexpected message: PMT") + } + JobDeclaration::ProvideMissingTransactionsSuccess(_) => { + info!("Received PMTS"); + Self::send_txs_to_mempool(self_mutex.clone()).await; + } + JobDeclaration::SubmitSolution(_) => todo!(), + } + Self::send(self_mutex.clone(), m).await.unwrap(); } Ok(SendTo::RelayNewMessage(message)) => { error!("JD Server: unexpected relay new message {:?}", message); @@ -192,64 +270,102 @@ impl JobDeclaratorDownstream { Ok(SendTo::Multiple(multiple)) => { error!("JD Server: unexpected multiple messages: {:?}", multiple); } - Ok(SendTo::None(m)) => match m { - Some(JobDeclaration::SubmitSolution(message)) => { - match JobDeclaratorDownstream::are_all_job_transactions_present(self_mutex.clone()) { - Ok(true_or_false) => if true_or_false { - info!("All transactions in downstream job are recognized correctly by the JD Server"); - } else { - // TODO print here the ip of the downstream - error!("Missing transactions at submit solution!"); - }, - Err(error) => handle_result!(tx_status, Err(error)), + Ok(SendTo::None(m)) => { + match m { + Some(JobDeclaration::SubmitSolution(message)) => { + match Self::are_all_job_transactions_present( + self_mutex.clone(), + ) { + Ok(_) => { + info!("All transactions in downstream job are recognized correctly by the JD Server"); + let hexdata = + match JobDeclaratorDownstream::get_block_hex( + self_mutex.clone(), + message, + ) { + Ok(inner) => inner, + Err(e) => { + error!( + "Received solution but encountered error: {:?}", + e + ); + recv.close(); + //TODO should we brake it? + break; + } + }; + let _ = new_block_sender.send(hexdata).await; + } + Err(error) => { + error!("Missing transactions: {:?}", error); + // TODO print here the ip of the downstream + let known_transactions = + JobDeclaratorDownstream::get_transactions_in_job( + self_mutex.clone(), + ); + let retrieve_transactions = + AddTrasactionsToMempoolInner { + known_transactions, + unknown_transactions: Vec::new(), + }; + let mempool = self_mutex + .clone() + .safe_lock(|a| a.mempool.clone()) + .unwrap(); + tokio::select! { + _ = JDsMempool::add_tx_data_to_mempool(mempool, retrieve_transactions) => { + let hexdata = match JobDeclaratorDownstream::get_block_hex( + self_mutex.clone(), + message.clone(), + ) { + Ok(inner) => inner, + Err(e) => { + error!( + "Error retrieving transactions: {:?}", + e + ); + recv.close(); + //TODO should we brake it? + break; + } + }; + let _ = new_block_sender.send(hexdata).await; + } + _ = tokio::time::sleep(Duration::from_secs(60)) => {} + }; + } + }; } - let hexdata = match JobDeclaratorDownstream::get_block_hex( - self_mutex.clone(), - message, - ) { - Ok(inner) => inner, - Err(e) => { - error!( - "Received solution but encountered error: {:?}", - e - ); - recv.close(); - //TODO should we brake it? - break; - } - }; - - let _ = new_block_sender.send(hexdata).await; - } - Some(JobDeclaration::DeclareMiningJob(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::DeclareMiningJobSuccess(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::DeclareMiningJobError(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::IdentifyTransactions(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::IdentifyTransactionsSuccess(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::AllocateMiningJobToken(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::AllocateMiningJobTokenSuccess(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::ProvideMissingTransactions(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::ProvideMissingTransactionsSuccess(_)) => { - error!("JD Server received an unexpected message {:?}", m); + Some(JobDeclaration::DeclareMiningJob(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::DeclareMiningJobSuccess(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::DeclareMiningJobError(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::IdentifyTransactions(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::IdentifyTransactionsSuccess(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::AllocateMiningJobToken(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::AllocateMiningJobTokenSuccess(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::ProvideMissingTransactions(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::ProvideMissingTransactionsSuccess(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + None => (), } - None => (), - }, + } Err(e) => { error!("{:?}", e); handle_result!( @@ -303,11 +419,19 @@ impl JobDeclarator { status_tx: crate::status::Sender, mempool: Arc>, new_block_sender: Sender, - sender_add_txs_to_mempool: Sender, + sender_add_txs_to_mempool: Sender, ) { let self_ = Arc::new(Mutex::new(Self {})); info!("JD INITIALIZED"); - Self::accept_incoming_connection(self_, config, status_tx, mempool, new_block_sender, sender_add_txs_to_mempool).await; + Self::accept_incoming_connection( + self_, + config, + status_tx, + mempool, + new_block_sender, + sender_add_txs_to_mempool, + ) + .await; } async fn accept_incoming_connection( _self_: Arc>, @@ -315,7 +439,7 @@ impl JobDeclarator { status_tx: crate::status::Sender, mempool: Arc>, new_block_sender: Sender, - sender_add_txs_to_mempool: Sender, + sender_add_txs_to_mempool: Sender, ) { let listner = TcpListener::bind(&config.listen_jd_address).await.unwrap(); while let Ok((stream, _)) = listner.accept().await { diff --git a/roles/jd-server/src/lib/mempool/mod.rs b/roles/jd-server/src/lib/mempool/mod.rs index a7db47d651..f176fb3e1f 100644 --- a/roles/jd-server/src/lib/mempool/mod.rs +++ b/roles/jd-server/src/lib/mempool/mod.rs @@ -1,6 +1,6 @@ pub mod error; +use super::job_declarator::AddTrasactionsToMempoolInner; use crate::mempool::error::JdsMempoolError; -use super::job_declarator::AddTrasactionsToMempool; use async_channel::Receiver; use bitcoin::blockdata::transaction::Transaction; use hashbrown::HashMap; @@ -62,20 +62,23 @@ impl JDsMempool { // this functions fill in the mempool the transactions with the given txid and insert the given // transactions. The ids are for the transactions that are already known to the node, the // unknown transactions are provided directly as a vector - pub async fn add_tx_data_to_mempool(self_: Arc>, add_transactions_to_mempool: AddTrasactionsToMempool) -> Result<(), JdsMempoolError>{ - let txids = add_transactions_to_mempool.known_transactions; - let transactions = add_transactions_to_mempool.unknown_transactions; + pub async fn add_tx_data_to_mempool( + self_: Arc>, + add_txs_to_mempool_inner: AddTrasactionsToMempoolInner, + ) -> Result<(), JdsMempoolError> { + let txids = add_txs_to_mempool_inner.known_transactions; + let transactions = add_txs_to_mempool_inner.unknown_transactions; let client = self_ .safe_lock(|a| a.get_client()) .map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))? .ok_or(JdsMempoolError::NoClient)?; - // fill in the mempool the transactions id in the mempool with the full transactions + // fill in the mempool the transactions id in the mempool with the full transactions // retrieved from the jd client for txid in txids { let transaction = client .get_raw_transaction(&txid.to_string(), None) .await - .map_err(|e| JdsMempoolError::Rpc(e))?; + .map_err(JdsMempoolError::Rpc)?; let _ = self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some(transaction))); } diff --git a/roles/jd-server/src/main.rs b/roles/jd-server/src/main.rs index 6d0530c35a..3391a93be8 100644 --- a/roles/jd-server/src/main.rs +++ b/roles/jd-server/src/main.rs @@ -191,22 +191,34 @@ async fn main() { let cloned = config.clone(); let mempool_cloned = mempool.clone(); - let (sender_jdd_for_mempool_update, receiver_jdd_for_mempool_update) = unbounded(); + let (sender_add_txs_to_mempool, receiver_add_txs_to_mempool) = unbounded(); task::spawn(async move { - JobDeclarator::start(cloned, sender, mempool_cloned, new_block_sender, sender_jdd_for_mempool_update).await + JobDeclarator::start( + cloned, + sender, + mempool_cloned, + new_block_sender, + sender_add_txs_to_mempool, + ) + .await }); task::spawn(async move { loop { - if let Ok(add_transactions_to_mempool) = receiver_jdd_for_mempool_update.recv().await { + if let Ok(add_transactions_to_mempool) = receiver_add_txs_to_mempool.recv().await { let mempool_cloned = mempool.clone(); task::spawn(async move { - match lib::mempool::JDsMempool::add_tx_data_to_mempool(mempool_cloned, add_transactions_to_mempool).await { + match lib::mempool::JDsMempool::add_tx_data_to_mempool( + mempool_cloned, + add_transactions_to_mempool, + ) + .await + { Ok(_) => (), Err(err) => { // TODO // here there should be a better error management mempool::error::handle_error(&err); - }, + } } }); }