Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
lorbax committed Mar 9, 2024
1 parent decd186 commit 387c61c
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 76 deletions.
2 changes: 2 additions & 0 deletions protocols/v2/roles-logic-sv2/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub enum Error {
TargetError(InputError),
HashrateError(InputError),
LogicErrorMessage(std::boxed::Box<AllMessages<'static>>),
JDSMissingTransactions,
}

impl From<BinarySv2Error> for Error {
Expand Down Expand Up @@ -149,6 +150,7 @@ impl Display for Error {
TargetError(e) => write!(f, "Impossible to get Target: {:?}", e),
HashrateError(e) => write!(f, "Impossible to get Hashrate: {:?}", e),
LogicErrorMessage(e) => write!(f, "Message is well formatted but can not be handled: {:?}", e),
JDSMissingTransactions => write!(f, "JD server cannot propagate the block: missing transactions"),
}
}
}
2 changes: 2 additions & 0 deletions roles/jd-server/src/lib/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub enum JdsError {
Custom(String),
Sv2ProtocolError((u32, Mining<'static>)),
MempoolError(JdsMempoolError),
ImpossibleToReconstructBlock(String),
}

impl std::fmt::Display for JdsError {
Expand All @@ -42,6 +43,7 @@ impl std::fmt::Display for JdsError {
write!(f, "Received Sv2 Protocol Error from upstream: `{:?}`", e)
}
MempoolError(ref e) => write!(f, "Mempool error: `{:?}`", e),
ImpossibleToReconstructBlock(e) => write!(f, "Error in reconstructing the block: {:?}", e),
}
}
}
Expand Down
137 changes: 75 additions & 62 deletions roles/jd-server/src/lib/job_declarator/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use roles_logic_sv2::{
DeclareMiningJobError, DeclareMiningJobSuccess, IdentifyTransactionsSuccess,
ProvideMissingTransactions, ProvideMissingTransactionsSuccess, SubmitSolutionJd,
},
parsers::JobDeclaration,
parsers::JobDeclaration, utils::Mutex,
};
use std::{convert::TryInto, io::Cursor};
use std::{convert::TryInto, io::Cursor, sync::Arc};
use stratum_common::bitcoin::Transaction;
pub type SendTo = SendTo_<JobDeclaration<'static>, ()>;
use super::signed_token;
use crate::mempool::{self, error::JdsMempoolError};
use super::{signed_token, TransactionState};
use crate::mempool::{self, error::JdsMempoolError, JDsMempool};
use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages};
use stratum_common::bitcoin::consensus::Decodable;
use tracing::info;
Expand Down Expand Up @@ -77,7 +77,7 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
.safe_lock(|x| x.to_short_ids(nonce))
.unwrap()
.unwrap();
let mut txs_in_job = vec![];
let mut transactions_with_state = vec![TransactionState::Missing; short_hash_list.len()];
let mut txs_to_retrieve: Vec<(String, usize)> = vec![];
let mut missing_txs = vec![];

Expand All @@ -86,27 +86,31 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
match short_id_mempool.get(&sid_) {
Some(tx_data) => match &tx_data.tx {
Some(tx) => {
if i >= txs_in_job.len() {
txs_in_job.resize(i + 1, tx.clone());
}
txs_in_job.insert(i, tx.clone())
transactions_with_state[i] = TransactionState::Present(tx.clone());
}
None => {
txs_to_retrieve.push(((tx_data.id.to_string()), i));
transactions_with_state[i] = TransactionState::ToBeRetrievedFromMempool(tx_data.id);
}
},
None => missing_txs.push(i as u16),
None => {
transactions_with_state[i] = TransactionState::Missing;
// TODO remove this, the the ids of missing transactions from the vector
missing_txs.push(i as u16);
},
}
}
self.declared_mining_job = Some((
message.clone().into_static(),
txs_in_job,
missing_txs.clone(),
transactions_with_state
));

if !txs_to_retrieve.is_empty() {
add_tx_data_to_job(txs_to_retrieve, self);
}
// TODO handle as a task on main
let transaction_list = self.declared_mining_job.as_ref().unwrap().1.clone();
// TODO call add_tx_data_to_mempool
let new_transactions = add_tx_data_to_job(transaction_list, self.mempool.clone());
//if !txs_to_retrieve.is_empty() {
// add_tx_data_to_job(txs_to_retrieve, self);
//}

if missing_txs.is_empty() {
let message_success = DeclareMiningJobSuccess {
Expand All @@ -122,6 +126,8 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
} else {
let message_provide_missing_transactions = ProvideMissingTransactions {
request_id: message.request_id,
// TODO here get the missing IDS from the entries of txs_in_job which are
// TransactionState::Missing
unknown_tx_position_list: missing_txs.into(),
};
let message_enum_provide_missing_transactions =
Expand Down Expand Up @@ -153,30 +159,38 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
message: ProvideMissingTransactionsSuccess,
) -> Result<SendTo, Error> {
match &mut self.declared_mining_job {
Some((_, ref mut transactions, missing_indexes)) => {
Some((_, ref mut transactions_with_state)) => {
for (i, tx) in message.transaction_list.inner_as_ref().iter().enumerate() {
let mut cursor = Cursor::new(tx);
let tx = Transaction::consensus_decode_from_finite_reader(&mut cursor)
.expect("Invalid tx data from downstream");
let index =
*missing_indexes
.get(i)
.ok_or(Error::LogicErrorMessage(Box::new(
AllMessages::JobDeclaration(
JobDeclaration::ProvideMissingTransactionsSuccess(
message.clone().into_static(),
),
),
)))? as usize;
if index >= transactions.len() {
transactions.resize(index + 1, tx.clone());
for tx_with_state in transactions_with_state.clone() {
match tx_with_state {
TransactionState::Present(_) => continue,
TransactionState::ToBeRetrievedFromMempool(_) => continue,
TransactionState::Missing => {
let mut cursor = Cursor::new(tx);
// TODO remove this unwrap
let transaction = Transaction::consensus_decode_from_finite_reader(&mut cursor).unwrap();
transactions_with_state[i] = TransactionState::Present(transaction.clone());
mempool::JDsMempool::add_tx_data_to_mempool(
self.mempool.clone(),
transaction.txid(),
Some(transaction),
);
break;
}

}
}
}
// if there still a missing transaction return an error
for tx_with_state in transactions_with_state {
match tx_with_state {
TransactionState::Present(_) => continue,
TransactionState::ToBeRetrievedFromMempool(_) => continue,
TransactionState::Missing => {
return Err(Error::JDSMissingTransactions);

}
}
transactions.insert(index, tx.clone());
mempool::JDsMempool::add_tx_data_to_mempool(
self.mempool.clone(),
tx.txid(),
Some(tx),
);
}
// TODO check it
let tx_hash_list_hash = self.tx_hash_list_hash.clone().unwrap().into_static();
Expand Down Expand Up @@ -206,33 +220,32 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
}
}

fn add_tx_data_to_job(tx_id_list: Vec<(String, usize)>, jdd: &mut JobDeclaratorDownstream) {
let mempool = jdd.mempool.clone();
let mut declared_mining_job = jdd.declared_mining_job.clone();
fn add_tx_data_to_job(tx_list: Vec<TransactionState>, mempool: Arc<Mutex<JDsMempool>>) -> Vec<Transaction> {
let new_transactions_mutex = Arc::new(Mutex::new(Vec::new()));
tokio::task::spawn(async move {
for tx in tx_id_list.iter().enumerate() {
let index = tx.1 .1;
let new_tx_data: Result<Transaction, JdsMempoolError> = mempool
.safe_lock(|x| x.get_client())
.map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))?
.ok_or(JdsMempoolError::NoClient)?
.get_raw_transaction(&tx.1 .0, None)
.await
.map_err(JdsMempoolError::Rpc);
if let Ok(tx) = new_tx_data {
if let Some((_, transactions, _)) = &mut declared_mining_job {
if index >= transactions.len() {
transactions.resize(index + 1, tx.clone());
}
transactions.insert(index, tx.clone());
for tx in tx_list.iter().enumerate() {
match tx.1 {
&TransactionState::Missing | &TransactionState::Present(_) => continue,
&TransactionState::ToBeRetrievedFromMempool(txid) => {
let new_tx_data: Result<Transaction, JdsMempoolError> = mempool
.safe_lock(|x| x.get_client())
.map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))?
.ok_or(JdsMempoolError::NoClient)?
.get_raw_transaction(&txid.to_string(), None)
.await
.map_err(JdsMempoolError::Rpc);
if let Ok(transaction) = new_tx_data {
new_transactions_mutex.safe_lock(|a| a.push(transaction));
//this unwrap is safe
} else {
// TODO propagate error
todo!()
};
}
mempool::JDsMempool::add_tx_data_to_mempool(
mempool.clone(),
tx.clone().txid(),
Some(tx.clone()),
);
}
};
}
Ok::<(), JdsMempoolError>(())
});
let new_transactions: Vec<Transaction> = new_transactions_mutex.clone().safe_lock(|a| a.clone()).unwrap().clone();
new_transactions
}
56 changes: 42 additions & 14 deletions roles/jd-server/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,27 @@ use tracing::{error, info};

use stratum_common::bitcoin::{
consensus::{encode::serialize, Encodable},
Block, Transaction,
Block, Transaction, Txid,
};


// this structure wraps each transaction with the index of the position in the job declared by the
// JD-Client. the tx field can be None if the transaction is missing. In this case, the index is
// used to retrieve the transaction from the message ProvideMissingTransactionsSuccess
//#[derive(Debug)]
//struct TransactionWithIndex {
// tx: Option<Transaction>,
// index: u16,
//}

#[derive(Clone, Debug)]
enum TransactionState {
Present(Transaction),
ToBeRetrievedFromMempool(Txid),
Missing,
}


#[derive(Debug)]
pub struct JobDeclaratorDownstream {
sender: Sender<EitherFrame>,
Expand All @@ -37,7 +55,11 @@ pub struct JobDeclaratorDownstream {
public_key: Secp256k1PublicKey,
private_key: Secp256k1SecretKey,
mempool: Arc<Mutex<JDsMempool>>,
declared_mining_job: Option<(DeclareMiningJob<'static>, Vec<Transaction>, Vec<u16>)>,
// Vec<u16> is the vector of missing transactions
// this should be (Option<DeclareMiningJob<'static>>, Vec<TransactionState>)
// TODO call the vector with TransactionState in with the same name everywhere, also in the
// block creator in utils
declared_mining_job: Option<(DeclareMiningJob<'static>, Vec<TransactionState>)>,
tx_hash_list_hash: Option<U256<'static>>,
}

Expand Down Expand Up @@ -70,18 +92,23 @@ impl JobDeclaratorDownstream {
}
}

fn get_block_hex(self_mutex: Arc<Mutex<Self>>, message: SubmitSolutionJd) -> Option<String> {
fn get_block_hex(self_mutex: Arc<Mutex<Self>>, message: SubmitSolutionJd) -> Result<String, JdsError> {
//TODO: implement logic for success or error
let (last_declare, tx_list, _) = match self_mutex
let (last_declare, transactions_with_state) = self_mutex
.safe_lock(|x| x.declared_mining_job.take())
.unwrap()
{
Some((last_declare, tx_list, _x)) => (last_declare, tx_list, _x),
None => return None,
};
// TODO manage these errors
.unwrap().unwrap();
let mut transactions_list: Vec<Transaction> = Vec::new();
for tx_with_state in transactions_with_state.iter().enumerate() {
if let TransactionState::Present(tx) = tx_with_state.1 {
transactions_list.push(tx.clone());
} else {
return Err(JdsError::ImpossibleToReconstructBlock("Missing transactions".to_string()));
};
}
let block: Block =
roles_logic_sv2::utils::BlockCreator::new(last_declare, tx_list, message).into();
Some(hex::encode(serialize(&block)))
roles_logic_sv2::utils::BlockCreator::new(last_declare, transactions_list, message).into();
Ok(hex::encode(serialize(&block)))
}

pub async fn send(
Expand Down Expand Up @@ -138,10 +165,11 @@ impl JobDeclaratorDownstream {
self_mutex.clone(),
message,
) {
Some(inner) => inner,
None => {
error!("Received solution but no job available");
Ok(inner) => inner,
Err(e) => {
error!("Received solution but encountered error: {:?}", e);
recv.close();
//TODO should we brake it?
break;
}
};
Expand Down
4 changes: 4 additions & 0 deletions roles/jd-server/src/lib/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ async fn send_status(
outcome
}

// TODO do we really want to brake at MempoolError and ImpossibleToReconstructBlock?
// this is called by `error_handling::handle_result!`
pub async fn handle_error(sender: &Sender, e: JdsError) -> error_handling::ErrorBranch {
tracing::debug!("Error: {:?}", &e);
Expand Down Expand Up @@ -121,5 +122,8 @@ pub async fn handle_error(sender: &Sender, e: JdsError) -> error_handling::Error
JdsError::MempoolError(_) => {
send_status(sender, e, error_handling::ErrorBranch::Break).await
}
JdsError::ImpossibleToReconstructBlock(_) => {
send_status(sender, e, error_handling::ErrorBranch::Break).await
}
}
}

0 comments on commit 387c61c

Please sign in to comment.