From 387c61c9c0a9523b966a58d7cb96266115ed8a04 Mon Sep 17 00:00:00 2001 From: lorban Date: Sat, 9 Mar 2024 12:41:41 +0100 Subject: [PATCH] tmp --- protocols/v2/roles-logic-sv2/src/errors.rs | 2 + roles/jd-server/src/lib/error.rs | 2 + .../src/lib/job_declarator/message_handler.rs | 137 ++++++++++-------- roles/jd-server/src/lib/job_declarator/mod.rs | 56 +++++-- roles/jd-server/src/lib/status.rs | 4 + 5 files changed, 125 insertions(+), 76 deletions(-) diff --git a/protocols/v2/roles-logic-sv2/src/errors.rs b/protocols/v2/roles-logic-sv2/src/errors.rs index 1d8625124b..198b94e011 100644 --- a/protocols/v2/roles-logic-sv2/src/errors.rs +++ b/protocols/v2/roles-logic-sv2/src/errors.rs @@ -59,6 +59,7 @@ pub enum Error { TargetError(InputError), HashrateError(InputError), LogicErrorMessage(std::boxed::Box>), + JDSMissingTransactions, } impl From for Error { @@ -149,6 +150,7 @@ impl Display for Error { TargetError(e) => write!(f, "Impossible to get Target: {:?}", e), HashrateError(e) => write!(f, "Impossible to get Hashrate: {:?}", e), LogicErrorMessage(e) => write!(f, "Message is well formatted but can not be handled: {:?}", e), + JDSMissingTransactions => write!(f, "JD server cannot propagate the block: missing transactions"), } } } diff --git a/roles/jd-server/src/lib/error.rs b/roles/jd-server/src/lib/error.rs index e94a2395cb..235c8f19d7 100644 --- a/roles/jd-server/src/lib/error.rs +++ b/roles/jd-server/src/lib/error.rs @@ -22,6 +22,7 @@ pub enum JdsError { Custom(String), Sv2ProtocolError((u32, Mining<'static>)), MempoolError(JdsMempoolError), + ImpossibleToReconstructBlock(String), } impl std::fmt::Display for JdsError { @@ -42,6 +43,7 @@ impl std::fmt::Display for JdsError { write!(f, "Received Sv2 Protocol Error from upstream: `{:?}`", e) } MempoolError(ref e) => write!(f, "Mempool error: `{:?}`", e), + ImpossibleToReconstructBlock(e) => write!(f, "Error in reconstructing the block: {:?}", e), } } } diff --git a/roles/jd-server/src/lib/job_declarator/message_handler.rs b/roles/jd-server/src/lib/job_declarator/message_handler.rs index a23a933df3..19c5affab7 100644 --- a/roles/jd-server/src/lib/job_declarator/message_handler.rs +++ b/roles/jd-server/src/lib/job_declarator/message_handler.rs @@ -6,13 +6,13 @@ use roles_logic_sv2::{ DeclareMiningJobError, DeclareMiningJobSuccess, IdentifyTransactionsSuccess, ProvideMissingTransactions, ProvideMissingTransactionsSuccess, SubmitSolutionJd, }, - parsers::JobDeclaration, + parsers::JobDeclaration, utils::Mutex, }; -use std::{convert::TryInto, io::Cursor}; +use std::{convert::TryInto, io::Cursor, sync::Arc}; use stratum_common::bitcoin::Transaction; pub type SendTo = SendTo_, ()>; -use super::signed_token; -use crate::mempool::{self, error::JdsMempoolError}; +use super::{signed_token, TransactionState}; +use crate::mempool::{self, error::JdsMempoolError, JDsMempool}; use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages}; use stratum_common::bitcoin::consensus::Decodable; use tracing::info; @@ -77,7 +77,7 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { .safe_lock(|x| x.to_short_ids(nonce)) .unwrap() .unwrap(); - let mut txs_in_job = vec![]; + let mut transactions_with_state = vec![TransactionState::Missing; short_hash_list.len()]; let mut txs_to_retrieve: Vec<(String, usize)> = vec![]; let mut missing_txs = vec![]; @@ -86,27 +86,31 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { match short_id_mempool.get(&sid_) { Some(tx_data) => match &tx_data.tx { Some(tx) => { - if i >= txs_in_job.len() { - txs_in_job.resize(i + 1, tx.clone()); - } - txs_in_job.insert(i, tx.clone()) + transactions_with_state[i] = TransactionState::Present(tx.clone()); } None => { - txs_to_retrieve.push(((tx_data.id.to_string()), i)); + transactions_with_state[i] = TransactionState::ToBeRetrievedFromMempool(tx_data.id); } }, - None => missing_txs.push(i as u16), + None => { + transactions_with_state[i] = TransactionState::Missing; + // TODO remove this, the the ids of missing transactions from the vector + missing_txs.push(i as u16); + }, } } self.declared_mining_job = Some(( message.clone().into_static(), - txs_in_job, - missing_txs.clone(), + transactions_with_state )); - if !txs_to_retrieve.is_empty() { - add_tx_data_to_job(txs_to_retrieve, self); - } + // TODO handle as a task on main + let transaction_list = self.declared_mining_job.as_ref().unwrap().1.clone(); + // TODO call add_tx_data_to_mempool + let new_transactions = add_tx_data_to_job(transaction_list, self.mempool.clone()); + //if !txs_to_retrieve.is_empty() { + // add_tx_data_to_job(txs_to_retrieve, self); + //} if missing_txs.is_empty() { let message_success = DeclareMiningJobSuccess { @@ -122,6 +126,8 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { } else { let message_provide_missing_transactions = ProvideMissingTransactions { request_id: message.request_id, + // TODO here get the missing IDS from the entries of txs_in_job which are + // TransactionState::Missing unknown_tx_position_list: missing_txs.into(), }; let message_enum_provide_missing_transactions = @@ -153,30 +159,38 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { message: ProvideMissingTransactionsSuccess, ) -> Result { match &mut self.declared_mining_job { - Some((_, ref mut transactions, missing_indexes)) => { + Some((_, ref mut transactions_with_state)) => { for (i, tx) in message.transaction_list.inner_as_ref().iter().enumerate() { - let mut cursor = Cursor::new(tx); - let tx = Transaction::consensus_decode_from_finite_reader(&mut cursor) - .expect("Invalid tx data from downstream"); - let index = - *missing_indexes - .get(i) - .ok_or(Error::LogicErrorMessage(Box::new( - AllMessages::JobDeclaration( - JobDeclaration::ProvideMissingTransactionsSuccess( - message.clone().into_static(), - ), - ), - )))? as usize; - if index >= transactions.len() { - transactions.resize(index + 1, tx.clone()); + for tx_with_state in transactions_with_state.clone() { + match tx_with_state { + TransactionState::Present(_) => continue, + TransactionState::ToBeRetrievedFromMempool(_) => continue, + TransactionState::Missing => { + let mut cursor = Cursor::new(tx); + // TODO remove this unwrap + let transaction = Transaction::consensus_decode_from_finite_reader(&mut cursor).unwrap(); + transactions_with_state[i] = TransactionState::Present(transaction.clone()); + mempool::JDsMempool::add_tx_data_to_mempool( + self.mempool.clone(), + transaction.txid(), + Some(transaction), + ); + break; + } + + } + } + } + // if there still a missing transaction return an error + for tx_with_state in transactions_with_state { + match tx_with_state { + TransactionState::Present(_) => continue, + TransactionState::ToBeRetrievedFromMempool(_) => continue, + TransactionState::Missing => { + return Err(Error::JDSMissingTransactions); + + } } - transactions.insert(index, tx.clone()); - mempool::JDsMempool::add_tx_data_to_mempool( - self.mempool.clone(), - tx.txid(), - Some(tx), - ); } // TODO check it let tx_hash_list_hash = self.tx_hash_list_hash.clone().unwrap().into_static(); @@ -206,33 +220,32 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { } } -fn add_tx_data_to_job(tx_id_list: Vec<(String, usize)>, jdd: &mut JobDeclaratorDownstream) { - let mempool = jdd.mempool.clone(); - let mut declared_mining_job = jdd.declared_mining_job.clone(); +fn add_tx_data_to_job(tx_list: Vec, mempool: Arc>) -> Vec { + let new_transactions_mutex = Arc::new(Mutex::new(Vec::new())); tokio::task::spawn(async move { - for tx in tx_id_list.iter().enumerate() { - let index = tx.1 .1; - let new_tx_data: Result = mempool - .safe_lock(|x| x.get_client()) - .map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))? - .ok_or(JdsMempoolError::NoClient)? - .get_raw_transaction(&tx.1 .0, None) - .await - .map_err(JdsMempoolError::Rpc); - if let Ok(tx) = new_tx_data { - if let Some((_, transactions, _)) = &mut declared_mining_job { - if index >= transactions.len() { - transactions.resize(index + 1, tx.clone()); - } - transactions.insert(index, tx.clone()); + for tx in tx_list.iter().enumerate() { + match tx.1 { + &TransactionState::Missing | &TransactionState::Present(_) => continue, + &TransactionState::ToBeRetrievedFromMempool(txid) => { + let new_tx_data: Result = mempool + .safe_lock(|x| x.get_client()) + .map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))? + .ok_or(JdsMempoolError::NoClient)? + .get_raw_transaction(&txid.to_string(), None) + .await + .map_err(JdsMempoolError::Rpc); + if let Ok(transaction) = new_tx_data { + new_transactions_mutex.safe_lock(|a| a.push(transaction)); + //this unwrap is safe + } else { + // TODO propagate error + todo!() + }; } - mempool::JDsMempool::add_tx_data_to_mempool( - mempool.clone(), - tx.clone().txid(), - Some(tx.clone()), - ); - } + }; } Ok::<(), JdsMempoolError>(()) }); + let new_transactions: Vec = new_transactions_mutex.clone().safe_lock(|a| a.clone()).unwrap().clone(); + new_transactions } diff --git a/roles/jd-server/src/lib/job_declarator/mod.rs b/roles/jd-server/src/lib/job_declarator/mod.rs index 08a9f62d93..45798f2d0d 100644 --- a/roles/jd-server/src/lib/job_declarator/mod.rs +++ b/roles/jd-server/src/lib/job_declarator/mod.rs @@ -21,9 +21,27 @@ use tracing::{error, info}; use stratum_common::bitcoin::{ consensus::{encode::serialize, Encodable}, - Block, Transaction, + Block, Transaction, Txid, }; + +// this structure wraps each transaction with the index of the position in the job declared by the +// JD-Client. the tx field can be None if the transaction is missing. In this case, the index is +// used to retrieve the transaction from the message ProvideMissingTransactionsSuccess +//#[derive(Debug)] +//struct TransactionWithIndex { +// tx: Option, +// index: u16, +//} + +#[derive(Clone, Debug)] +enum TransactionState { + Present(Transaction), + ToBeRetrievedFromMempool(Txid), + Missing, +} + + #[derive(Debug)] pub struct JobDeclaratorDownstream { sender: Sender, @@ -37,7 +55,11 @@ pub struct JobDeclaratorDownstream { public_key: Secp256k1PublicKey, private_key: Secp256k1SecretKey, mempool: Arc>, - declared_mining_job: Option<(DeclareMiningJob<'static>, Vec, Vec)>, + // Vec is the vector of missing transactions + // this should be (Option>, Vec) + // TODO call the vector with TransactionState in with the same name everywhere, also in the + // block creator in utils + declared_mining_job: Option<(DeclareMiningJob<'static>, Vec)>, tx_hash_list_hash: Option>, } @@ -70,18 +92,23 @@ impl JobDeclaratorDownstream { } } - fn get_block_hex(self_mutex: Arc>, message: SubmitSolutionJd) -> Option { + fn get_block_hex(self_mutex: Arc>, message: SubmitSolutionJd) -> Result { //TODO: implement logic for success or error - let (last_declare, tx_list, _) = match self_mutex + let (last_declare, transactions_with_state) = self_mutex .safe_lock(|x| x.declared_mining_job.take()) - .unwrap() - { - Some((last_declare, tx_list, _x)) => (last_declare, tx_list, _x), - None => return None, - }; + // TODO manage these errors + .unwrap().unwrap(); + let mut transactions_list: Vec = Vec::new(); + for tx_with_state in transactions_with_state.iter().enumerate() { + if let TransactionState::Present(tx) = tx_with_state.1 { + transactions_list.push(tx.clone()); + } else { + return Err(JdsError::ImpossibleToReconstructBlock("Missing transactions".to_string())); + }; + } let block: Block = - roles_logic_sv2::utils::BlockCreator::new(last_declare, tx_list, message).into(); - Some(hex::encode(serialize(&block))) + roles_logic_sv2::utils::BlockCreator::new(last_declare, transactions_list, message).into(); + Ok(hex::encode(serialize(&block))) } pub async fn send( @@ -138,10 +165,11 @@ impl JobDeclaratorDownstream { self_mutex.clone(), message, ) { - Some(inner) => inner, - None => { - error!("Received solution but no job available"); + Ok(inner) => inner, + Err(e) => { + error!("Received solution but encountered error: {:?}", e); recv.close(); + //TODO should we brake it? break; } }; diff --git a/roles/jd-server/src/lib/status.rs b/roles/jd-server/src/lib/status.rs index b05294e47b..010cceef6f 100644 --- a/roles/jd-server/src/lib/status.rs +++ b/roles/jd-server/src/lib/status.rs @@ -94,6 +94,7 @@ async fn send_status( outcome } +// TODO do we really want to brake at MempoolError and ImpossibleToReconstructBlock? // this is called by `error_handling::handle_result!` pub async fn handle_error(sender: &Sender, e: JdsError) -> error_handling::ErrorBranch { tracing::debug!("Error: {:?}", &e); @@ -121,5 +122,8 @@ pub async fn handle_error(sender: &Sender, e: JdsError) -> error_handling::Error JdsError::MempoolError(_) => { send_status(sender, e, error_handling::ErrorBranch::Break).await } + JdsError::ImpossibleToReconstructBlock(_) => { + send_status(sender, e, error_handling::ErrorBranch::Break).await + } } }