Skip to content

Commit 73de958

Browse files
committed
error management
1 parent 7014e90 commit 73de958

File tree

4 files changed

+100
-148
lines changed

4 files changed

+100
-148
lines changed

roles/jd-server/src/lib/error.rs

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub enum JdsError {
2323
Sv2ProtocolError((u32, Mining<'static>)),
2424
MempoolError(JdsMempoolError),
2525
ImpossibleToReconstructBlock(String),
26+
NoLastDeclaredJob,
2627
}
2728

2829
impl std::fmt::Display for JdsError {
@@ -44,6 +45,7 @@ impl std::fmt::Display for JdsError {
4445
}
4546
MempoolError(ref e) => write!(f, "Mempool error: `{:?}`", e),
4647
ImpossibleToReconstructBlock(e) => write!(f, "Error in reconstructing the block: {:?}", e),
48+
NoLastDeclaredJob => write!(f, "Last declared job not found"),
4749
}
4850
}
4951
}

roles/jd-server/src/lib/job_declarator/message_handler.rs

+45-107
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ use roles_logic_sv2::{
66
DeclareMiningJobError, DeclareMiningJobSuccess, IdentifyTransactionsSuccess,
77
ProvideMissingTransactions, ProvideMissingTransactionsSuccess, SubmitSolutionJd,
88
},
9-
parsers::JobDeclaration, utils::Mutex,
9+
parsers::JobDeclaration,
1010
};
11-
use std::{convert::TryInto, io::Cursor, sync::Arc};
12-
use stratum_common::bitcoin::{Transaction, Txid};
11+
use std::{convert::TryInto, io::Cursor};
12+
use stratum_common::bitcoin::Transaction;
1313
pub type SendTo = SendTo_<JobDeclaration<'static>, ()>;
1414
use super::{signed_token, TransactionState};
15-
use crate::mempool::{self, error::JdsMempoolError, JDsMempool};
16-
use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages};
15+
use crate::mempool;
16+
use roles_logic_sv2::errors::Error;
1717
use stratum_common::bitcoin::consensus::Decodable;
1818
use tracing::info;
1919

@@ -78,7 +78,6 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
7878
.unwrap()
7979
.unwrap();
8080
let mut transactions_with_state = vec![TransactionState::Missing; short_hash_list.len()];
81-
let mut txs_to_retrieve: Vec<(String, usize)> = vec![];
8281
let mut missing_txs: Vec<u16> = Vec::new();
8382

8483
for (i, sid) in short_hash_list.iter().enumerate() {
@@ -99,10 +98,10 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
9998
},
10099
}
101100
}
102-
self.declared_mining_job = Some((
103-
message.clone().into_static(),
101+
self.declared_mining_job = (Some(
102+
message.clone().into_static()),
104103
transactions_with_state
105-
));
104+
);
106105

107106
//let self_mutex = Arc::new(Mutex::new(self));
108107
//add_tx_data_to_job(self_mutex);
@@ -153,59 +152,51 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
153152
&mut self,
154153
message: ProvideMissingTransactionsSuccess,
155154
) -> Result<SendTo, Error> {
156-
match &mut self.declared_mining_job {
157-
Some((_, ref mut transactions_with_state)) => {
158-
for (i, tx) in message.transaction_list.inner_as_ref().iter().enumerate() {
159-
for tx_with_state in transactions_with_state.clone() {
160-
match tx_with_state {
161-
TransactionState::Present(_) => continue,
162-
TransactionState::ToBeRetrievedFromMempool(_) => continue,
163-
TransactionState::Missing => {
164-
let mut cursor = Cursor::new(tx);
165-
// TODO remove this unwrap
166-
let transaction = Transaction::consensus_decode_from_finite_reader(&mut cursor).unwrap();
167-
transactions_with_state[i] = TransactionState::Present(transaction.clone());
168-
mempool::JDsMempool::add_tx_data_to_mempool(
169-
self.mempool.clone(),
170-
transaction.txid(),
171-
Some(transaction),
172-
);
173-
break;
174-
}
175-
176-
}
177-
}
178-
}
179-
// if there still a missing transaction return an error
180-
for tx_with_state in transactions_with_state {
155+
let (_, ref mut transactions_with_state) = &mut self.declared_mining_job;
156+
for (i, tx) in message.transaction_list.inner_as_ref().iter().enumerate() {
157+
for tx_with_state in transactions_with_state.clone() {
181158
match tx_with_state {
182159
TransactionState::Present(_) => continue,
183160
TransactionState::ToBeRetrievedFromMempool(_) => continue,
184161
TransactionState::Missing => {
185-
return Err(Error::JDSMissingTransactions);
186-
162+
let mut cursor = Cursor::new(tx);
163+
// TODO remove this unwrap
164+
let transaction = Transaction::consensus_decode_from_finite_reader(&mut cursor).unwrap();
165+
transactions_with_state[i] = TransactionState::Present(transaction.clone());
166+
mempool::JDsMempool::add_tx_data_to_mempool(
167+
self.mempool.clone(),
168+
transaction.txid(),
169+
Some(transaction),
170+
);
171+
break;
187172
}
173+
188174
}
189175
}
190-
// TODO check it
191-
let tx_hash_list_hash = self.tx_hash_list_hash.clone().unwrap().into_static();
192-
let message_success = DeclareMiningJobSuccess {
193-
request_id: message.request_id,
194-
new_mining_job_token: signed_token(
195-
tx_hash_list_hash,
196-
&self.public_key.clone(),
197-
&self.private_key.clone(),
198-
),
199-
};
200-
let message_enum_success = JobDeclaration::DeclareMiningJobSuccess(message_success);
201-
Ok(SendTo::Respond(message_enum_success))
202176
}
203-
None => Err(Error::LogicErrorMessage(Box::new(
204-
AllMessages::JobDeclaration(JobDeclaration::ProvideMissingTransactionsSuccess(
205-
message.clone().into_static(),
206-
)),
207-
))),
208-
}
177+
// if there still a missing transaction return an error
178+
for tx_with_state in transactions_with_state {
179+
match tx_with_state {
180+
TransactionState::Present(_) => continue,
181+
TransactionState::ToBeRetrievedFromMempool(_) => continue,
182+
TransactionState::Missing => {
183+
return Err(Error::JDSMissingTransactions)
184+
185+
}
186+
}
187+
}
188+
// TODO check it
189+
let tx_hash_list_hash = self.tx_hash_list_hash.clone().unwrap().into_static();
190+
let message_success = DeclareMiningJobSuccess {
191+
request_id: message.request_id,
192+
new_mining_job_token: signed_token(
193+
tx_hash_list_hash,
194+
&self.public_key.clone(),
195+
&self.private_key.clone(),
196+
),
197+
};
198+
let message_enum_success = JobDeclaration::DeclareMiningJobSuccess(message_success);
199+
Ok(SendTo::Respond(message_enum_success))
209200
}
210201

211202
fn handle_submit_solution(&mut self, message: SubmitSolutionJd<'_>) -> Result<SendTo, Error> {
@@ -215,56 +206,3 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
215206
}
216207
}
217208

218-
//fn add_tx_data_to_job(jdd: Arc<Mutex<&mut JobDeclaratorDownstream>>) {
219-
// tokio::task::spawn(async move {
220-
// let mut tx_list: Vec<Txid> = Vec::new();
221-
// let mut new_transactions: Vec<Transaction> = Vec::new();
222-
// let mempool = jdd.safe_lock(|a| a.mempool.clone()).unwrap();
223-
// jdd.safe_lock(|a| for tx in a.declared_mining_job.clone().unwrap().1 {
224-
// match tx {
225-
// TransactionState::Present(_) => continue,
226-
// TransactionState::Missing => continue,
227-
// TransactionState::ToBeRetrievedFromMempool(m) => tx_list.push(m),
228-
// }
229-
// });
230-
// for txid in tx_list {
231-
// let new_tx_data: Result<Transaction, JdsMempoolError> = mempool
232-
// .safe_lock(|x| x.get_client())
233-
// .map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))?
234-
// .ok_or(JdsMempoolError::NoClient)?
235-
// .get_raw_transaction(&txid.to_string(), None)
236-
// .await
237-
// .map_err(JdsMempoolError::Rpc);
238-
// if let Ok(transaction) = new_tx_data {
239-
// new_transactions.push(transaction);
240-
// //this unwrap is safe
241-
// } else {
242-
// // TODO propagate error
243-
// todo!()
244-
// };
245-
// };
246-
//
247-
// //for tx in tx_list.iter().enumerate() {
248-
// // match tx.1 {
249-
// // &TransactionState::Missing | &TransactionState::Present(_) => continue,
250-
// // &TransactionState::ToBeRetrievedFromMempool(txid) => {
251-
// // let new_tx_data: Result<Transaction, JdsMempoolError> = mempool
252-
// // .safe_lock(|x| x.get_client())
253-
// // .map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))?
254-
// // .ok_or(JdsMempoolError::NoClient)?
255-
// // .get_raw_transaction(&txid.to_string(), None)
256-
// // .await
257-
// // .map_err(JdsMempoolError::Rpc);
258-
// // if let Ok(transaction) = new_tx_data {
259-
// // new_transactions_mutex.safe_lock(|a| a.push(transaction));
260-
// // //this unwrap is safe
261-
// // } else {
262-
// // // TODO propagate error
263-
// // todo!()
264-
// // };
265-
// // }
266-
// // };
267-
// //}
268-
// Ok::<(), JdsMempoolError>(())
269-
// });
270-
//}

roles/jd-server/src/lib/job_declarator/mod.rs

+49-39
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
pub mod message_handler;
2+
use crate::mempool;
3+
24
use super::{error::JdsError, mempool::JDsMempool, status, Configuration, EitherFrame, StdFrame};
35
use async_channel::{Receiver, Sender};
46
use binary_sv2::{B0255, U256};
@@ -59,7 +61,7 @@ pub struct JobDeclaratorDownstream {
5961
// this should be (Option<DeclareMiningJob<'static>>, Vec<TransactionState>)
6062
// TODO call the vector with TransactionState in with the same name everywhere, also in the
6163
// block creator in utils
62-
declared_mining_job: Option<(DeclareMiningJob<'static>, Vec<TransactionState>)>,
64+
declared_mining_job: (Option<DeclareMiningJob<'static>>, Vec<TransactionState>),
6365
tx_hash_list_hash: Option<U256<'static>>,
6466
}
6567

@@ -87,44 +89,38 @@ impl JobDeclaratorDownstream {
8789
public_key: config.authority_public_key,
8890
private_key: config.authority_secret_key,
8991
mempool,
90-
declared_mining_job: None,
92+
declared_mining_job: (None, Vec::new()),
9193
tx_hash_list_hash: None,
9294
}
9395
}
9496

95-
async fn get_job_transactions_from_mempool(self_mutex: Arc<Mutex<JobDeclaratorDownstream>>) {
97+
// This only errors that are returned are PoisonLock, Custom, MempoolError
98+
// this function is called in JobDeclaratorDowenstream::start(), if different errors are
99+
// returned, change also the error management there
100+
async fn retrieve_transactions_via_rpc(self_mutex: Arc<Mutex<JobDeclaratorDownstream>>) -> Result<(), JdsError> {
96101
let mut transactions_to_be_retrieved: Vec<Txid> = Vec::new();
97102
let mut new_transactions: Vec<Transaction> = Vec::new();
98-
self_mutex.clone().safe_lock(|a| for tx_with_state in a.declared_mining_job.clone().unwrap().1 {
103+
let transactions_with_state = self_mutex.clone().safe_lock(|a| {
104+
a.declared_mining_job.clone()
105+
}).map_err(|e| JdsError::PoisonLock(e.to_string()))?.1;
106+
for tx_with_state in transactions_with_state {
99107
match tx_with_state {
100108
TransactionState::Present(_) => continue,
101109
TransactionState::ToBeRetrievedFromMempool(tx) => transactions_to_be_retrieved.push(tx),
102110
TransactionState::Missing => continue,
103111
}
104-
});
105-
let mempool_ = self_mutex.safe_lock(|a| a.mempool.clone()).unwrap();
106-
let client = mempool_.safe_lock(|a| a.get_client()).unwrap().unwrap();
112+
}
113+
let mempool_ = self_mutex.safe_lock(|a| a.mempool.clone()).map_err(|e| JdsError::PoisonLock(e.to_string()))?;
114+
let client = mempool_.clone().safe_lock(|a| a.get_client()).map_err(|e| JdsError::PoisonLock(e.to_string()))?.ok_or(JdsError::MempoolError(mempool::error::JdsMempoolError::NoClient))?;
107115
for txid in transactions_to_be_retrieved {
108-
let transaction = client.get_raw_transaction(&txid.to_string(), None).await.unwrap();
116+
let transaction = client.get_raw_transaction(&txid.to_string(), None).await.map_err(|e| JdsError::MempoolError(mempool::error::JdsMempoolError::Rpc(e)))?;
117+
let txid = transaction.txid();
118+
mempool::JDsMempool::add_tx_data_to_mempool(mempool_.clone(), txid, Some(transaction.clone()));
109119
new_transactions.push(transaction);
110-
// let new_tx_data: Result<Transaction, JdsMempoolError> = mempool
111-
// .safe_lock(|x| x.get_client())
112-
// .map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))?
113-
// .ok_or(JdsMempoolError::NoClient)?
114-
// .get_raw_transaction(&txid.to_string(), None)
115-
// .await
116-
// .map_err(JdsMempoolError::Rpc);
117-
// if let Ok(transaction) = new_tx_data {
118-
// new_transactions.push(transaction);
119-
// //this unwrap is safe
120-
// } else {
121-
// // TODO propagate error
122-
// todo!()
123-
// };
124-
125120
};
121+
//TODO remove this unwrap
126122
for transaction in new_transactions {
127-
self_mutex.clone().safe_lock(|a| for transaction_with_state in &mut a.declared_mining_job.as_mut().unwrap().1 {
123+
self_mutex.clone().safe_lock(|a| for transaction_with_state in &mut a.declared_mining_job.1 {
128124
match transaction_with_state {
129125
TransactionState::Present(_) => continue,
130126
TransactionState::ToBeRetrievedFromMempool(_) => {
@@ -133,26 +129,18 @@ impl JobDeclaratorDownstream {
133129
},
134130
TransactionState::Missing => continue,
135131
}
136-
}).unwrap()
132+
}).map_err(|e| JdsError::PoisonLock(e.to_string()))?
137133
};
138-
//self_mutex.clone().safe_lock(|a| for transaction_with_state in a.declared_mining_job.unwrap().1 {
139-
// match transaction_with_state {
140-
// TransactionState::Present(_) => continue,
141-
// TransactionState::ToBeRetrievedFromMempool(_) => {transaction_with_state = TransactionState::Present(new_transactions.clone()[1])},
142-
// TransactionState::Missing => continue,
143-
// }
144-
//});
145-
146-
todo!()
147-
134+
Ok(())
148135
}
149136

150137
fn get_block_hex(self_mutex: Arc<Mutex<Self>>, message: SubmitSolutionJd) -> Result<String, JdsError> {
151138
//TODO: implement logic for success or error
152-
let (last_declare, transactions_with_state) = self_mutex
153-
.safe_lock(|x| x.declared_mining_job.take())
139+
let (last_declare_, transactions_with_state) = self_mutex
140+
.safe_lock(|x| x.declared_mining_job.clone())
154141
// TODO manage these errors
155-
.unwrap().unwrap();
142+
.map_err(|e| JdsError::PoisonLock(e.to_string()))?;
143+
let last_declare = last_declare_.ok_or(JdsError::NoLastDeclaredJob)?;
156144
let mut transactions_list: Vec<Transaction> = Vec::new();
157145
for tx_with_state in transactions_with_state.iter().enumerate() {
158146
if let TransactionState::Present(tx) = tx_with_state.1 {
@@ -276,7 +264,29 @@ impl JobDeclaratorDownstream {
276264
break;
277265
}
278266
}
279-
JobDeclaratorDownstream::get_job_transactions_from_mempool(self_mutex.clone()).await;
267+
let retrive_transactions = JobDeclaratorDownstream::retrieve_transactions_via_rpc(self_mutex.clone()).await;
268+
match retrive_transactions {
269+
Ok(_) => (),
270+
// Only expected errors from retrieve_transactions_via_rpc methods are:
271+
// PoisonLock, Custom, MempoolError
272+
Err(error) =>
273+
match error {
274+
JdsError::Io(e) => error!("Unexpected error while retrieving transactions from node{:?}", e),
275+
JdsError::ChannelSend(e) => error!("unexpected error while retrieving transactions from node{:?}", e),
276+
JdsError::ChannelRecv(e) => error!("unexpected error while retrieving transactions from node{:?}", e),
277+
JdsError::BinarySv2(e) => error!("unexpected error while retrieving transactions from node{:?}", e),
278+
JdsError::Codec(e) => error!("unexpected error while retrieving transactions from node{:?}", e),
279+
JdsError::Noise(e) => error!("Unexpected error while retrieving transactions from node{:?}", e),
280+
JdsError::RolesLogic(e) => error!("Unexpected error while retrieving transactions from node{:?}", e),
281+
JdsError::Framing(e) => error!("Unexpected error while retrieving transactions from node{:?}", e),
282+
JdsError::PoisonLock(e) => error!("PoisonLock error while retrieveing transactions from bitcoin node: {:?}", e),
283+
JdsError::Custom(e) => error!("Custom error while retrieveing transactions from bitcoin node: {:?}", e),
284+
JdsError::Sv2ProtocolError(_) => todo!(),
285+
JdsError::MempoolError(e) => error!("Mempool error: {:?}", e),
286+
JdsError::ImpossibleToReconstructBlock(e) => error!("Unexpected error while retrieving transactions from node{:?}", e),
287+
JdsError::NoLastDeclaredJob => error!("Last declared job not found"),
288+
},
289+
}
280290
}
281291
});
282292
}

roles/jd-server/src/lib/status.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ async fn send_status(
9494
outcome
9595
}
9696

97-
// TODO do we really want to brake at MempoolError and ImpossibleToReconstructBlock?
9897
// this is called by `error_handling::handle_result!`
9998
pub async fn handle_error(sender: &Sender, e: JdsError) -> error_handling::ErrorBranch {
10099
tracing::debug!("Error: {:?}", &e);
@@ -123,7 +122,10 @@ pub async fn handle_error(sender: &Sender, e: JdsError) -> error_handling::Error
123122
send_status(sender, e, error_handling::ErrorBranch::Break).await
124123
}
125124
JdsError::ImpossibleToReconstructBlock(_) => {
126-
send_status(sender, e, error_handling::ErrorBranch::Break).await
125+
send_status(sender, e, error_handling::ErrorBranch::Continue).await
126+
}
127+
JdsError::NoLastDeclaredJob => {
128+
send_status(sender, e, error_handling::ErrorBranch::Continue).await
127129
}
128130
}
129131
}

0 commit comments

Comments
 (0)