From 95fd8fc8cb18845a7de91052dedd1393e091c211 Mon Sep 17 00:00:00 2001 From: lorban Date: Sun, 17 Mar 2024 22:24:07 +0100 Subject: [PATCH] Move task on main A task that before was blocking now is moved as a separate task on main --- .../src/lib/job_declarator/message_handler.rs | 37 ++-- roles/jd-server/src/lib/job_declarator/mod.rs | 190 +++++++++--------- roles/jd-server/src/lib/mempool/mod.rs | 34 +++- roles/jd-server/src/main.rs | 21 +- 4 files changed, 165 insertions(+), 117 deletions(-) diff --git a/roles/jd-server/src/lib/job_declarator/message_handler.rs b/roles/jd-server/src/lib/job_declarator/message_handler.rs index 955aeb28d5..8e038f1818 100644 --- a/roles/jd-server/src/lib/job_declarator/message_handler.rs +++ b/roles/jd-server/src/lib/job_declarator/message_handler.rs @@ -9,10 +9,9 @@ use roles_logic_sv2::{ parsers::JobDeclaration, }; use std::{convert::TryInto, io::Cursor}; -use stratum_common::bitcoin::Transaction; +use stratum_common::bitcoin::{Transaction, Txid}; pub type SendTo = SendTo_, ()>; use super::{signed_token, TransactionState}; -use crate::mempool; use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages}; use stratum_common::bitcoin::consensus::Decodable; use tracing::info; @@ -62,6 +61,12 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { } fn handle_declare_mining_job(&mut self, message: DeclareMiningJob) -> Result { + // the transactions that are present in the mempool are stored here, that is sent to the + // mempool which use the rpc client to retrieve the whole data for each transactions. + // The unknown transactions is a vector that contains the transactions that are not in the + // jds mempool, and will be non-empty in the ProvideMissingTransactionsSuccess message + let mut known_transactions: Vec = Vec::new(); + let unknown_transactions: Vec = Vec::new(); self.tx_hash_list_hash = Some(message.tx_hash_list_hash.clone().into_static()); if self.verify_job(&message) { let short_hash_list: Vec = message @@ -84,14 +89,9 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { for (i, sid) in short_hash_list.iter().enumerate() { let sid_: [u8; 6] = sid.to_vec().try_into().unwrap(); match short_id_mempool.get(&sid_) { - Some(tx_data) => match &tx_data.tx { - Some(tx) => { - transactions_with_state[i] = TransactionState::Present(tx.txid()); - } - None => { - transactions_with_state[i] = - TransactionState::ToBeRetrievedFromNodeMempool(tx_data.id); - } + Some(tx_data) => { + transactions_with_state[i] = TransactionState::PresentInMempool(tx_data.id); + known_transactions.push(tx_data.id); }, None => { transactions_with_state[i] = TransactionState::Missing; @@ -104,6 +104,8 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { transactions_with_state, missing_txs.clone(), ); + // here we send the transactions that we want to be stored in jds mempool with full data + self.sender_add_txs_to_mempool.send(super::AddTrasactionsToMempool { known_transactions, unknown_transactions}); if missing_txs.is_empty() { let message_success = DeclareMiningJobSuccess { @@ -150,10 +152,13 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { message: ProvideMissingTransactionsSuccess, ) -> Result { let (_, ref mut transactions_with_state, missing_indexes) = &mut self.declared_mining_job; + let mut unknown_transactions: Vec = Vec::new(); + let known_transactions: Vec = Vec::new(); for (i, tx) in message.transaction_list.inner_as_ref().iter().enumerate() { let mut cursor = Cursor::new(tx); let transaction = Transaction::consensus_decode_from_finite_reader(&mut cursor) .map_err(|e| Error::TxDecodingError(e.to_string()))?; + &unknown_transactions.push(transaction.clone()); let index = *missing_indexes .get(i) .ok_or(Error::LogicErrorMessage(Box::new( @@ -161,18 +166,14 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { message.clone().into_static(), )), )))? as usize; - transactions_with_state[index] = TransactionState::Present(transaction.txid()); - mempool::JDsMempool::add_tx_data_to_mempool( - self.mempool.clone(), - transaction.txid(), - Some(transaction), - ); + // insert the missing transactions in the mempool + transactions_with_state[index] = TransactionState::PresentInMempool(transaction.txid()); } + self.sender_add_txs_to_mempool.send(super::AddTrasactionsToMempool { known_transactions, unknown_transactions: unknown_transactions.clone()}); // if there still a missing transaction return an error for tx_with_state in transactions_with_state { match tx_with_state { - TransactionState::Present(_) => continue, - TransactionState::ToBeRetrievedFromNodeMempool(_) => continue, + TransactionState::PresentInMempool(_) => continue, TransactionState::Missing => return Err(Error::JDSMissingTransactions), } } diff --git a/roles/jd-server/src/lib/job_declarator/mod.rs b/roles/jd-server/src/lib/job_declarator/mod.rs index a2cb2ca3d2..bf0976575c 100644 --- a/roles/jd-server/src/lib/job_declarator/mod.rs +++ b/roles/jd-server/src/lib/job_declarator/mod.rs @@ -1,6 +1,4 @@ pub mod message_handler; -use crate::mempool; - use super::{error::JdsError, mempool::JDsMempool, status, Configuration, EitherFrame, StdFrame}; use async_channel::{Receiver, Sender}; use binary_sv2::{B0255, U256}; @@ -26,22 +24,29 @@ use stratum_common::bitcoin::{ Block, Transaction, Txid, }; -// this structure wraps each transaction with the index of the position in the job declared by the -// JD-Client. the tx field can be None if the transaction is missing. In this case, the index is -// used to retrieve the transaction from the message ProvideMissingTransactionsSuccess -//#[derive(Debug)] -//struct TransactionWithIndex { -// tx: Option, -// index: u16, -//} - #[derive(Clone, Debug)] enum TransactionState { - Present(Txid), - ToBeRetrievedFromNodeMempool(Txid), + PresentInMempool(Txid), Missing, } +pub fn get_ids_of_known_transactions(transactions: &Vec) -> Vec { + let mut known_transactions: Vec = Vec::new(); + for transaction in transactions { + match *transaction { + TransactionState::PresentInMempool(txid) => known_transactions.push(txid.clone()), + TransactionState::Missing => continue, + } + }; + known_transactions +} + +#[derive(Clone, Debug)] +pub struct AddTrasactionsToMempool { + pub known_transactions: Vec, + pub unknown_transactions: Vec, +} + #[derive(Debug)] pub struct JobDeclaratorDownstream { sender: Sender, @@ -62,6 +67,7 @@ pub struct JobDeclaratorDownstream { Vec, ), tx_hash_list_hash: Option>, + sender_add_txs_to_mempool: Sender, } impl JobDeclaratorDownstream { @@ -70,6 +76,7 @@ impl JobDeclaratorDownstream { sender: Sender, config: &Configuration, mempool: Arc>, + sender_add_txs_to_mempool: Sender ) -> Self { let mut coinbase_output = vec![]; // TODO: use next variables @@ -90,74 +97,73 @@ impl JobDeclaratorDownstream { mempool, declared_mining_job: (None, Vec::new(), Vec::new()), tx_hash_list_hash: None, + sender_add_txs_to_mempool, } } - // This only errors that are returned are PoisonLock, Custom, MempoolError - // this function is called in JobDeclaratorDowenstream::start(), if different errors are - // returned, change also the error management there - async fn retrieve_transactions_via_rpc( - self_mutex: Arc>, - ) -> Result<(), JdsError> { - let mut transactions_to_be_retrieved: Vec = Vec::new(); - let mut new_transactions: Vec = Vec::new(); - let transactions_with_state = self_mutex - .clone() - .safe_lock(|a| a.declared_mining_job.clone()) - .map_err(|e| JdsError::PoisonLock(e.to_string()))? - .1; - for tx_with_state in transactions_with_state { - match tx_with_state { - TransactionState::Present(_) => continue, - TransactionState::ToBeRetrievedFromNodeMempool(tx) => { - transactions_to_be_retrieved.push(tx) - } - TransactionState::Missing => continue, - } - } - let mempool_ = self_mutex - .safe_lock(|a| a.mempool.clone()) - .map_err(|e| JdsError::PoisonLock(e.to_string()))?; - let client = mempool_ - .clone() - .safe_lock(|a| a.get_client()) - .map_err(|e| JdsError::PoisonLock(e.to_string()))? - .ok_or(JdsError::MempoolError( - mempool::error::JdsMempoolError::NoClient, - ))?; - for txid in transactions_to_be_retrieved { - let transaction = client - .get_raw_transaction(&txid.to_string(), None) - .await - .map_err(|e| JdsError::MempoolError(mempool::error::JdsMempoolError::Rpc(e)))?; - let txid = transaction.txid(); - mempool::JDsMempool::add_tx_data_to_mempool( - mempool_.clone(), - txid, - Some(transaction.clone()), - ); - new_transactions.push(transaction); - } - for transaction in new_transactions { - self_mutex - .clone() - .safe_lock(|a| { - for transaction_with_state in &mut a.declared_mining_job.1 { - match transaction_with_state { - TransactionState::Present(_) => continue, - TransactionState::ToBeRetrievedFromNodeMempool(_) => { - *transaction_with_state = - TransactionState::Present(transaction.txid()); - break; - } - TransactionState::Missing => continue, - } - } - }) - .map_err(|e| JdsError::PoisonLock(e.to_string()))? - } - Ok(()) - } + //// This only errors that are returned are PoisonLock, Custom, MempoolError + //// this function is called in JobDeclaratorDowenstream::start(), if different errors are + //// returned, change also the error management there + //async fn send_transactions_needed_in_mempool( + // self_mutex: Arc>, + //) -> Result<(), JdsError> { + // let sender_add_txs_to_mempool = self_mutex.safe_lock(|a| a.sender_add_txs_to_mempool.clone()).unwrap(); + // let mut transactions_to_be_retrieved: Vec = Vec::new(); + // let transactions_with_state = self_mutex + // .clone() + // .safe_lock(|a| a.declared_mining_job.clone()) + // .map_err(|e| JdsError::PoisonLock(e.to_string()))? + // .1; + // for tx_with_state in transactions_with_state { + // match tx_with_state { + // TransactionState::PresentInMempool(txid) => transactions_to_be_retrieved.push(txid), + // TransactionState::Missing => continue, + // } + // }; + // let _ = sender_add_txs_to_mempool.send(transactions_to_be_retrieved); + // //let mempool_ = self_mutex + // // .safe_lock(|a| a.mempool.clone()) + // // .map_err(|e| JdsError::PoisonLock(e.to_string()))?; + // //let client = mempool_ + // // .clone() + // // .safe_lock(|a| a.get_client()) + // // .map_err(|e| JdsError::PoisonLock(e.to_string()))? + // // .ok_or(JdsError::MempoolError( + // // mempool::error::JdsMempoolError::NoClient, + // // ))?; + // //for txid in transactions_to_be_retrieved { + // // let transaction = client + // // .get_raw_transaction(&txid.to_string(), None) + // // .await + // // .map_err(|e| JdsError::MempoolError(mempool::error::JdsMempoolError::Rpc(e)))?; + // // let txid = transaction.txid(); + // // mempool::JDsMempool::add_tx_data_to_mempool( + // // mempool_.clone(), + // // txid, + // // Some(transaction.clone()), + // // ); + // // new_transactions.push(transaction); + // //} + // //for transaction in new_transactions { + // // self_mutex + // // .clone() + // // .safe_lock(|a| { + // // for transaction_with_state in &mut a.declared_mining_job.1 { + // // match transaction_with_state { + // // TransactionState::Present(_) => continue, + // // TransactionState::ToBeRetrievedFromNodeMempool(_) => { + // // *transaction_with_state = + // // TransactionState::Present(transaction.txid()); + // // break; + // // } + // // TransactionState::Missing => continue, + // // } + // // } + // // }) + // // .map_err(|e| JdsError::PoisonLock(e.to_string()))? + // //} + // Ok(()) + //} fn get_block_hex( self_mutex: Arc>, @@ -172,7 +178,7 @@ impl JobDeclaratorDownstream { let last_declare = last_declare_.ok_or(JdsError::NoLastDeclaredJob)?; let mut transactions_list: Vec = Vec::new(); for tx_with_state in transactions_with_state.iter().enumerate() { - if let TransactionState::Present(txid) = tx_with_state.1 { + if let TransactionState::PresentInMempool(txid) = tx_with_state.1 { let tx_ = match mempool_.safe_lock(|x| x.mempool.get(txid).cloned()) { Ok(tx) => tx, Err(e) => return Err(Box::new(JdsError::PoisonLock(e.to_string()))), @@ -312,16 +318,16 @@ impl JobDeclaratorDownstream { break; } } - let retrieve_transactions = - JobDeclaratorDownstream::retrieve_transactions_via_rpc(self_mutex.clone()) - .await; - match retrieve_transactions { - Ok(_) => (), - Err(error) => { - handle_result!(tx_status, Err(error)); - break; - } - } + //let retrieve_transactions = + // JobDeclaratorDownstream::retrieve_transactions_via_rpc(self_mutex.clone()) + // .await; + //match retrieve_transactions { + // Ok(_) => (), + // Err(error) => { + // handle_result!(tx_status, Err(error)); + // break; + // } + //} } }); } @@ -359,10 +365,11 @@ impl JobDeclarator { status_tx: crate::status::Sender, mempool: Arc>, new_block_sender: Sender, + sender_add_txs_to_mempool: Sender, ) { let self_ = Arc::new(Mutex::new(Self {})); info!("JD INITIALIZED"); - Self::accept_incoming_connection(self_, config, status_tx, mempool, new_block_sender).await; + Self::accept_incoming_connection(self_, config, status_tx, mempool, new_block_sender, sender_add_txs_to_mempool).await; } async fn accept_incoming_connection( _self_: Arc>, @@ -370,6 +377,7 @@ impl JobDeclarator { status_tx: crate::status::Sender, mempool: Arc>, new_block_sender: Sender, + sender_add_txs_to_mempool: Sender, ) { let listner = TcpListener::bind(&config.listen_jd_address).await.unwrap(); while let Ok((stream, _)) = listner.accept().await { @@ -408,6 +416,8 @@ impl JobDeclarator { sender.clone(), &config, mempool.clone(), + // each downstream has its own sender (multi producer single consumer) + sender_add_txs_to_mempool.clone(), ))); JobDeclaratorDownstream::start( diff --git a/roles/jd-server/src/lib/mempool/mod.rs b/roles/jd-server/src/lib/mempool/mod.rs index 69fca884fe..a7db47d651 100644 --- a/roles/jd-server/src/lib/mempool/mod.rs +++ b/roles/jd-server/src/lib/mempool/mod.rs @@ -1,5 +1,6 @@ pub mod error; use crate::mempool::error::JdsMempoolError; +use super::job_declarator::AddTrasactionsToMempool; use async_channel::Receiver; use bitcoin::blockdata::transaction::Transaction; use hashbrown::HashMap; @@ -58,14 +59,31 @@ impl JDsMempool { } } - pub fn add_tx_data_to_mempool( - self_: Arc>, - txid: Txid, - transaction: Option, - ) { - let _ = self_.safe_lock(|x| { - x.mempool.insert(txid, transaction); - }); + // this functions fill in the mempool the transactions with the given txid and insert the given + // transactions. The ids are for the transactions that are already known to the node, the + // unknown transactions are provided directly as a vector + pub async fn add_tx_data_to_mempool(self_: Arc>, add_transactions_to_mempool: AddTrasactionsToMempool) -> Result<(), JdsMempoolError>{ + let txids = add_transactions_to_mempool.known_transactions; + let transactions = add_transactions_to_mempool.unknown_transactions; + let client = self_ + .safe_lock(|a| a.get_client()) + .map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))? + .ok_or(JdsMempoolError::NoClient)?; + // fill in the mempool the transactions id in the mempool with the full transactions + // retrieved from the jd client + for txid in txids { + let transaction = client + .get_raw_transaction(&txid.to_string(), None) + .await + .map_err(|e| JdsMempoolError::Rpc(e))?; + let _ = self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some(transaction))); + } + + // fill in the mempool the transactions given in input + for transaction in transactions { + let _ = self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some(transaction))); + } + Ok(()) } pub async fn update_mempool(self_: Arc>) -> Result<(), JdsMempoolError> { diff --git a/roles/jd-server/src/main.rs b/roles/jd-server/src/main.rs index abf8cc556c..6d0530c35a 100644 --- a/roles/jd-server/src/main.rs +++ b/roles/jd-server/src/main.rs @@ -177,6 +177,7 @@ async fn main() { } } _ => { + // TODO here there should be a better error managmenet mempool::error::handle_error(&err); handle_result!(sender_submit_solution, Err(err)); } @@ -190,8 +191,26 @@ async fn main() { let cloned = config.clone(); let mempool_cloned = mempool.clone(); + let (sender_jdd_for_mempool_update, receiver_jdd_for_mempool_update) = unbounded(); task::spawn(async move { - JobDeclarator::start(cloned, sender, mempool_cloned, new_block_sender).await + JobDeclarator::start(cloned, sender, mempool_cloned, new_block_sender, sender_jdd_for_mempool_update).await + }); + task::spawn(async move { + loop { + if let Ok(add_transactions_to_mempool) = receiver_jdd_for_mempool_update.recv().await { + let mempool_cloned = mempool.clone(); + task::spawn(async move { + match lib::mempool::JDsMempool::add_tx_data_to_mempool(mempool_cloned, add_transactions_to_mempool).await { + Ok(_) => (), + Err(err) => { + // TODO + // here there should be a better error management + mempool::error::handle_error(&err); + }, + } + }); + } + } }); // Start the error handling loop