Skip to content

Commit

Permalink
error management
Browse files Browse the repository at this point in the history
  • Loading branch information
lorbax committed Mar 12, 2024
1 parent 7014e90 commit 7ee76f0
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 179 deletions.
6 changes: 5 additions & 1 deletion roles/jd-server/src/lib/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub enum JdsError {
Sv2ProtocolError((u32, Mining<'static>)),
MempoolError(JdsMempoolError),
ImpossibleToReconstructBlock(String),
NoLastDeclaredJob,
}

impl std::fmt::Display for JdsError {
Expand All @@ -43,7 +44,10 @@ impl std::fmt::Display for JdsError {
write!(f, "Received Sv2 Protocol Error from upstream: `{:?}`", e)
}
MempoolError(ref e) => write!(f, "Mempool error: `{:?}`", e),
ImpossibleToReconstructBlock(e) => write!(f, "Error in reconstructing the block: {:?}", e),
ImpossibleToReconstructBlock(e) => {
write!(f, "Error in reconstructing the block: {:?}", e)
}
NoLastDeclaredJob => write!(f, "Last declared job not found"),
}
}
}
Expand Down
170 changes: 52 additions & 118 deletions roles/jd-server/src/lib/job_declarator/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use roles_logic_sv2::{
DeclareMiningJobError, DeclareMiningJobSuccess, IdentifyTransactionsSuccess,
ProvideMissingTransactions, ProvideMissingTransactionsSuccess, SubmitSolutionJd,
},
parsers::JobDeclaration, utils::Mutex,
parsers::JobDeclaration,
};
use std::{convert::TryInto, io::Cursor, sync::Arc};
use stratum_common::bitcoin::{Transaction, Txid};
use std::{convert::TryInto, io::Cursor};
use stratum_common::bitcoin::Transaction;
pub type SendTo = SendTo_<JobDeclaration<'static>, ()>;
use super::{signed_token, TransactionState};
use crate::mempool::{self, error::JdsMempoolError, JDsMempool};
use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages};
use crate::mempool;
use roles_logic_sv2::errors::Error;
use stratum_common::bitcoin::consensus::Decodable;
use tracing::info;

Expand Down Expand Up @@ -77,9 +77,9 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
.safe_lock(|x| x.to_short_ids(nonce))
.unwrap()
.unwrap();
let mut transactions_with_state = vec![TransactionState::Missing; short_hash_list.len()];
let mut txs_to_retrieve: Vec<(String, usize)> = vec![];
let mut missing_txs: Vec<u16> = Vec::new();
let mut transactions_with_state =
vec![TransactionState::Missing; short_hash_list.len()];
let mut missing_txs: Vec<u16> = Vec::new();

for (i, sid) in short_hash_list.iter().enumerate() {
let sid_: [u8; 6] = sid.to_vec().try_into().unwrap();
Expand All @@ -89,20 +89,19 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
transactions_with_state[i] = TransactionState::Present(tx.clone());
}
None => {
transactions_with_state[i] = TransactionState::ToBeRetrievedFromMempool(tx_data.id);
transactions_with_state[i] =
TransactionState::ToBeRetrievedFromMempool(tx_data.id);
}
},
None => {
transactions_with_state[i] = TransactionState::Missing;
// TODO remove this, the the ids of missing transactions from the vector
missing_txs.push(i as u16);
},
}
}
}
self.declared_mining_job = Some((
message.clone().into_static(),
transactions_with_state
));
self.declared_mining_job =
(Some(message.clone().into_static()), transactions_with_state);

//let self_mutex = Arc::new(Mutex::new(self));
//add_tx_data_to_job(self_mutex);
Expand All @@ -122,7 +121,7 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
let message_provide_missing_transactions = ProvideMissingTransactions {
request_id: message.request_id,
// TODO here get the missing IDS from the entries of txs_in_job which are
// TransactionState::Missing
// TransactionState::Missing
unknown_tx_position_list: missing_txs.into(),
};
let message_enum_provide_missing_transactions =
Expand Down Expand Up @@ -153,59 +152,48 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
&mut self,
message: ProvideMissingTransactionsSuccess,
) -> Result<SendTo, Error> {
match &mut self.declared_mining_job {
Some((_, ref mut transactions_with_state)) => {
for (i, tx) in message.transaction_list.inner_as_ref().iter().enumerate() {
for tx_with_state in transactions_with_state.clone() {
match tx_with_state {
TransactionState::Present(_) => continue,
TransactionState::ToBeRetrievedFromMempool(_) => continue,
TransactionState::Missing => {
let mut cursor = Cursor::new(tx);
// TODO remove this unwrap
let transaction = Transaction::consensus_decode_from_finite_reader(&mut cursor).unwrap();
transactions_with_state[i] = TransactionState::Present(transaction.clone());
mempool::JDsMempool::add_tx_data_to_mempool(
self.mempool.clone(),
transaction.txid(),
Some(transaction),
);
break;
}

}
let (_, ref mut transactions_with_state) = &mut self.declared_mining_job;
for (i, tx) in message.transaction_list.inner_as_ref().iter().enumerate() {
for tx_with_state in transactions_with_state.clone() {
match tx_with_state {
TransactionState::Present(_) => continue,
TransactionState::ToBeRetrievedFromMempool(_) => continue,
TransactionState::Missing => {
let mut cursor = Cursor::new(tx);
// TODO remove this unwrap
let transaction =
Transaction::consensus_decode_from_finite_reader(&mut cursor).unwrap();
transactions_with_state[i] = TransactionState::Present(transaction.clone());
mempool::JDsMempool::add_tx_data_to_mempool(
self.mempool.clone(),
transaction.txid(),
Some(transaction),
);
break;
}
}
// if there still a missing transaction return an error
for tx_with_state in transactions_with_state {
match tx_with_state {
TransactionState::Present(_) => continue,
TransactionState::ToBeRetrievedFromMempool(_) => continue,
TransactionState::Missing => {
return Err(Error::JDSMissingTransactions);

}
}
}
// TODO check it
let tx_hash_list_hash = self.tx_hash_list_hash.clone().unwrap().into_static();
let message_success = DeclareMiningJobSuccess {
request_id: message.request_id,
new_mining_job_token: signed_token(
tx_hash_list_hash,
&self.public_key.clone(),
&self.private_key.clone(),
),
};
let message_enum_success = JobDeclaration::DeclareMiningJobSuccess(message_success);
Ok(SendTo::Respond(message_enum_success))
}
None => Err(Error::LogicErrorMessage(Box::new(
AllMessages::JobDeclaration(JobDeclaration::ProvideMissingTransactionsSuccess(
message.clone().into_static(),
)),
))),
}
// if there still a missing transaction return an error
for tx_with_state in transactions_with_state {
match tx_with_state {
TransactionState::Present(_) => continue,
TransactionState::ToBeRetrievedFromMempool(_) => continue,
TransactionState::Missing => return Err(Error::JDSMissingTransactions),
}
}
// TODO check it
let tx_hash_list_hash = self.tx_hash_list_hash.clone().unwrap().into_static();
let message_success = DeclareMiningJobSuccess {
request_id: message.request_id,
new_mining_job_token: signed_token(
tx_hash_list_hash,
&self.public_key.clone(),
&self.private_key.clone(),
),
};
let message_enum_success = JobDeclaration::DeclareMiningJobSuccess(message_success);
Ok(SendTo::Respond(message_enum_success))
}

fn handle_submit_solution(&mut self, message: SubmitSolutionJd<'_>) -> Result<SendTo, Error> {
Expand All @@ -214,57 +202,3 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
Ok(SendTo::None(Some(m)))
}
}

//fn add_tx_data_to_job(jdd: Arc<Mutex<&mut JobDeclaratorDownstream>>) {
// tokio::task::spawn(async move {
// let mut tx_list: Vec<Txid> = Vec::new();
// let mut new_transactions: Vec<Transaction> = Vec::new();
// let mempool = jdd.safe_lock(|a| a.mempool.clone()).unwrap();
// jdd.safe_lock(|a| for tx in a.declared_mining_job.clone().unwrap().1 {
// match tx {
// TransactionState::Present(_) => continue,
// TransactionState::Missing => continue,
// TransactionState::ToBeRetrievedFromMempool(m) => tx_list.push(m),
// }
// });
// for txid in tx_list {
// let new_tx_data: Result<Transaction, JdsMempoolError> = mempool
// .safe_lock(|x| x.get_client())
// .map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))?
// .ok_or(JdsMempoolError::NoClient)?
// .get_raw_transaction(&txid.to_string(), None)
// .await
// .map_err(JdsMempoolError::Rpc);
// if let Ok(transaction) = new_tx_data {
// new_transactions.push(transaction);
// //this unwrap is safe
// } else {
// // TODO propagate error
// todo!()
// };
// };
//
// //for tx in tx_list.iter().enumerate() {
// // match tx.1 {
// // &TransactionState::Missing | &TransactionState::Present(_) => continue,
// // &TransactionState::ToBeRetrievedFromMempool(txid) => {
// // let new_tx_data: Result<Transaction, JdsMempoolError> = mempool
// // .safe_lock(|x| x.get_client())
// // .map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))?
// // .ok_or(JdsMempoolError::NoClient)?
// // .get_raw_transaction(&txid.to_string(), None)
// // .await
// // .map_err(JdsMempoolError::Rpc);
// // if let Ok(transaction) = new_tx_data {
// // new_transactions_mutex.safe_lock(|a| a.push(transaction));
// // //this unwrap is safe
// // } else {
// // // TODO propagate error
// // todo!()
// // };
// // }
// // };
// //}
// Ok::<(), JdsMempoolError>(())
// });
//}
Loading

0 comments on commit 7ee76f0

Please sign in to comment.