Skip to content

Commit

Permalink
Move task on main
Browse files Browse the repository at this point in the history
A task that before was blocking now is moved as a separate task on main
  • Loading branch information
lorbax committed Mar 18, 2024
1 parent b0caab4 commit 95fd8fc
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 117 deletions.
37 changes: 19 additions & 18 deletions roles/jd-server/src/lib/job_declarator/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ use roles_logic_sv2::{
parsers::JobDeclaration,
};
use std::{convert::TryInto, io::Cursor};
use stratum_common::bitcoin::Transaction;
use stratum_common::bitcoin::{Transaction, Txid};
pub type SendTo = SendTo_<JobDeclaration<'static>, ()>;
use super::{signed_token, TransactionState};
use crate::mempool;
use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages};
use stratum_common::bitcoin::consensus::Decodable;
use tracing::info;
Expand Down Expand Up @@ -62,6 +61,12 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
}

fn handle_declare_mining_job(&mut self, message: DeclareMiningJob) -> Result<SendTo, Error> {
// the transactions that are present in the mempool are stored here, that is sent to the
// mempool which use the rpc client to retrieve the whole data for each transactions.
// The unknown transactions is a vector that contains the transactions that are not in the
// jds mempool, and will be non-empty in the ProvideMissingTransactionsSuccess message
let mut known_transactions: Vec<Txid> = Vec::new();
let unknown_transactions: Vec<Transaction> = Vec::new();
self.tx_hash_list_hash = Some(message.tx_hash_list_hash.clone().into_static());
if self.verify_job(&message) {
let short_hash_list: Vec<ShortTxId> = message
Expand All @@ -84,14 +89,9 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
for (i, sid) in short_hash_list.iter().enumerate() {
let sid_: [u8; 6] = sid.to_vec().try_into().unwrap();
match short_id_mempool.get(&sid_) {
Some(tx_data) => match &tx_data.tx {
Some(tx) => {
transactions_with_state[i] = TransactionState::Present(tx.txid());
}
None => {
transactions_with_state[i] =
TransactionState::ToBeRetrievedFromNodeMempool(tx_data.id);
}
Some(tx_data) => {
transactions_with_state[i] = TransactionState::PresentInMempool(tx_data.id);
known_transactions.push(tx_data.id);
},
None => {
transactions_with_state[i] = TransactionState::Missing;
Expand All @@ -104,6 +104,8 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
transactions_with_state,
missing_txs.clone(),
);
// here we send the transactions that we want to be stored in jds mempool with full data
self.sender_add_txs_to_mempool.send(super::AddTrasactionsToMempool { known_transactions, unknown_transactions});

if missing_txs.is_empty() {
let message_success = DeclareMiningJobSuccess {
Expand Down Expand Up @@ -150,29 +152,28 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
message: ProvideMissingTransactionsSuccess,
) -> Result<SendTo, Error> {
let (_, ref mut transactions_with_state, missing_indexes) = &mut self.declared_mining_job;
let mut unknown_transactions: Vec<Transaction> = Vec::new();
let known_transactions: Vec<Txid> = Vec::new();
for (i, tx) in message.transaction_list.inner_as_ref().iter().enumerate() {
let mut cursor = Cursor::new(tx);
let transaction = Transaction::consensus_decode_from_finite_reader(&mut cursor)
.map_err(|e| Error::TxDecodingError(e.to_string()))?;
&unknown_transactions.push(transaction.clone());
let index = *missing_indexes
.get(i)
.ok_or(Error::LogicErrorMessage(Box::new(
AllMessages::JobDeclaration(JobDeclaration::ProvideMissingTransactionsSuccess(
message.clone().into_static(),
)),
)))? as usize;
transactions_with_state[index] = TransactionState::Present(transaction.txid());
mempool::JDsMempool::add_tx_data_to_mempool(
self.mempool.clone(),
transaction.txid(),
Some(transaction),
);
// insert the missing transactions in the mempool
transactions_with_state[index] = TransactionState::PresentInMempool(transaction.txid());
}
self.sender_add_txs_to_mempool.send(super::AddTrasactionsToMempool { known_transactions, unknown_transactions: unknown_transactions.clone()});
// if there still a missing transaction return an error
for tx_with_state in transactions_with_state {
match tx_with_state {
TransactionState::Present(_) => continue,
TransactionState::ToBeRetrievedFromNodeMempool(_) => continue,
TransactionState::PresentInMempool(_) => continue,
TransactionState::Missing => return Err(Error::JDSMissingTransactions),
}
}
Expand Down
190 changes: 100 additions & 90 deletions roles/jd-server/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
pub mod message_handler;
use crate::mempool;

use super::{error::JdsError, mempool::JDsMempool, status, Configuration, EitherFrame, StdFrame};
use async_channel::{Receiver, Sender};
use binary_sv2::{B0255, U256};
Expand All @@ -26,22 +24,29 @@ use stratum_common::bitcoin::{
Block, Transaction, Txid,
};

// this structure wraps each transaction with the index of the position in the job declared by the
// JD-Client. the tx field can be None if the transaction is missing. In this case, the index is
// used to retrieve the transaction from the message ProvideMissingTransactionsSuccess
//#[derive(Debug)]
//struct TransactionWithIndex {
// tx: Option<Transaction>,
// index: u16,
//}

#[derive(Clone, Debug)]
enum TransactionState {
Present(Txid),
ToBeRetrievedFromNodeMempool(Txid),
PresentInMempool(Txid),
Missing,
}

pub fn get_ids_of_known_transactions(transactions: &Vec<TransactionState>) -> Vec<Txid> {
let mut known_transactions: Vec<Txid> = Vec::new();
for transaction in transactions {
match *transaction {
TransactionState::PresentInMempool(txid) => known_transactions.push(txid.clone()),
TransactionState::Missing => continue,
}
};
known_transactions
}

#[derive(Clone, Debug)]
pub struct AddTrasactionsToMempool {
pub known_transactions: Vec<Txid>,
pub unknown_transactions: Vec<Transaction>,
}

#[derive(Debug)]
pub struct JobDeclaratorDownstream {
sender: Sender<EitherFrame>,
Expand All @@ -62,6 +67,7 @@ pub struct JobDeclaratorDownstream {
Vec<u16>,
),
tx_hash_list_hash: Option<U256<'static>>,
sender_add_txs_to_mempool: Sender<AddTrasactionsToMempool>,
}

impl JobDeclaratorDownstream {
Expand All @@ -70,6 +76,7 @@ impl JobDeclaratorDownstream {
sender: Sender<EitherFrame>,
config: &Configuration,
mempool: Arc<Mutex<JDsMempool>>,
sender_add_txs_to_mempool: Sender<AddTrasactionsToMempool>
) -> Self {
let mut coinbase_output = vec![];
// TODO: use next variables
Expand All @@ -90,74 +97,73 @@ impl JobDeclaratorDownstream {
mempool,
declared_mining_job: (None, Vec::new(), Vec::new()),
tx_hash_list_hash: None,
sender_add_txs_to_mempool,
}
}

// This only errors that are returned are PoisonLock, Custom, MempoolError
// this function is called in JobDeclaratorDowenstream::start(), if different errors are
// returned, change also the error management there
async fn retrieve_transactions_via_rpc(
self_mutex: Arc<Mutex<JobDeclaratorDownstream>>,
) -> Result<(), JdsError> {
let mut transactions_to_be_retrieved: Vec<Txid> = Vec::new();
let mut new_transactions: Vec<Transaction> = Vec::new();
let transactions_with_state = self_mutex
.clone()
.safe_lock(|a| a.declared_mining_job.clone())
.map_err(|e| JdsError::PoisonLock(e.to_string()))?
.1;
for tx_with_state in transactions_with_state {
match tx_with_state {
TransactionState::Present(_) => continue,
TransactionState::ToBeRetrievedFromNodeMempool(tx) => {
transactions_to_be_retrieved.push(tx)
}
TransactionState::Missing => continue,
}
}
let mempool_ = self_mutex
.safe_lock(|a| a.mempool.clone())
.map_err(|e| JdsError::PoisonLock(e.to_string()))?;
let client = mempool_
.clone()
.safe_lock(|a| a.get_client())
.map_err(|e| JdsError::PoisonLock(e.to_string()))?
.ok_or(JdsError::MempoolError(
mempool::error::JdsMempoolError::NoClient,
))?;
for txid in transactions_to_be_retrieved {
let transaction = client
.get_raw_transaction(&txid.to_string(), None)
.await
.map_err(|e| JdsError::MempoolError(mempool::error::JdsMempoolError::Rpc(e)))?;
let txid = transaction.txid();
mempool::JDsMempool::add_tx_data_to_mempool(
mempool_.clone(),
txid,
Some(transaction.clone()),
);
new_transactions.push(transaction);
}
for transaction in new_transactions {
self_mutex
.clone()
.safe_lock(|a| {
for transaction_with_state in &mut a.declared_mining_job.1 {
match transaction_with_state {
TransactionState::Present(_) => continue,
TransactionState::ToBeRetrievedFromNodeMempool(_) => {
*transaction_with_state =
TransactionState::Present(transaction.txid());
break;
}
TransactionState::Missing => continue,
}
}
})
.map_err(|e| JdsError::PoisonLock(e.to_string()))?
}
Ok(())
}
//// This only errors that are returned are PoisonLock, Custom, MempoolError
//// this function is called in JobDeclaratorDowenstream::start(), if different errors are
//// returned, change also the error management there
//async fn send_transactions_needed_in_mempool(
// self_mutex: Arc<Mutex<JobDeclaratorDownstream>>,
//) -> Result<(), JdsError> {
// let sender_add_txs_to_mempool = self_mutex.safe_lock(|a| a.sender_add_txs_to_mempool.clone()).unwrap();
// let mut transactions_to_be_retrieved: Vec<Txid> = Vec::new();
// let transactions_with_state = self_mutex
// .clone()
// .safe_lock(|a| a.declared_mining_job.clone())
// .map_err(|e| JdsError::PoisonLock(e.to_string()))?
// .1;
// for tx_with_state in transactions_with_state {
// match tx_with_state {
// TransactionState::PresentInMempool(txid) => transactions_to_be_retrieved.push(txid),
// TransactionState::Missing => continue,
// }
// };
// let _ = sender_add_txs_to_mempool.send(transactions_to_be_retrieved);
// //let mempool_ = self_mutex
// // .safe_lock(|a| a.mempool.clone())
// // .map_err(|e| JdsError::PoisonLock(e.to_string()))?;
// //let client = mempool_
// // .clone()
// // .safe_lock(|a| a.get_client())
// // .map_err(|e| JdsError::PoisonLock(e.to_string()))?
// // .ok_or(JdsError::MempoolError(
// // mempool::error::JdsMempoolError::NoClient,
// // ))?;
// //for txid in transactions_to_be_retrieved {
// // let transaction = client
// // .get_raw_transaction(&txid.to_string(), None)
// // .await
// // .map_err(|e| JdsError::MempoolError(mempool::error::JdsMempoolError::Rpc(e)))?;
// // let txid = transaction.txid();
// // mempool::JDsMempool::add_tx_data_to_mempool(
// // mempool_.clone(),
// // txid,
// // Some(transaction.clone()),
// // );
// // new_transactions.push(transaction);
// //}
// //for transaction in new_transactions {
// // self_mutex
// // .clone()
// // .safe_lock(|a| {
// // for transaction_with_state in &mut a.declared_mining_job.1 {
// // match transaction_with_state {
// // TransactionState::Present(_) => continue,
// // TransactionState::ToBeRetrievedFromNodeMempool(_) => {
// // *transaction_with_state =
// // TransactionState::Present(transaction.txid());
// // break;
// // }
// // TransactionState::Missing => continue,
// // }
// // }
// // })
// // .map_err(|e| JdsError::PoisonLock(e.to_string()))?
// //}
// Ok(())
//}

fn get_block_hex(
self_mutex: Arc<Mutex<Self>>,
Expand All @@ -172,7 +178,7 @@ impl JobDeclaratorDownstream {
let last_declare = last_declare_.ok_or(JdsError::NoLastDeclaredJob)?;
let mut transactions_list: Vec<Transaction> = Vec::new();
for tx_with_state in transactions_with_state.iter().enumerate() {
if let TransactionState::Present(txid) = tx_with_state.1 {
if let TransactionState::PresentInMempool(txid) = tx_with_state.1 {
let tx_ = match mempool_.safe_lock(|x| x.mempool.get(txid).cloned()) {
Ok(tx) => tx,
Err(e) => return Err(Box::new(JdsError::PoisonLock(e.to_string()))),
Expand Down Expand Up @@ -312,16 +318,16 @@ impl JobDeclaratorDownstream {
break;
}
}
let retrieve_transactions =
JobDeclaratorDownstream::retrieve_transactions_via_rpc(self_mutex.clone())
.await;
match retrieve_transactions {
Ok(_) => (),
Err(error) => {
handle_result!(tx_status, Err(error));
break;
}
}
//let retrieve_transactions =
// JobDeclaratorDownstream::retrieve_transactions_via_rpc(self_mutex.clone())
// .await;
//match retrieve_transactions {
// Ok(_) => (),
// Err(error) => {
// handle_result!(tx_status, Err(error));
// break;
// }
//}
}
});
}
Expand Down Expand Up @@ -359,17 +365,19 @@ impl JobDeclarator {
status_tx: crate::status::Sender,
mempool: Arc<Mutex<JDsMempool>>,
new_block_sender: Sender<String>,
sender_add_txs_to_mempool: Sender<AddTrasactionsToMempool>,
) {
let self_ = Arc::new(Mutex::new(Self {}));
info!("JD INITIALIZED");
Self::accept_incoming_connection(self_, config, status_tx, mempool, new_block_sender).await;
Self::accept_incoming_connection(self_, config, status_tx, mempool, new_block_sender, sender_add_txs_to_mempool).await;
}
async fn accept_incoming_connection(
_self_: Arc<Mutex<JobDeclarator>>,
config: Configuration,
status_tx: crate::status::Sender,
mempool: Arc<Mutex<JDsMempool>>,
new_block_sender: Sender<String>,
sender_add_txs_to_mempool: Sender<AddTrasactionsToMempool>,
) {
let listner = TcpListener::bind(&config.listen_jd_address).await.unwrap();
while let Ok((stream, _)) = listner.accept().await {
Expand Down Expand Up @@ -408,6 +416,8 @@ impl JobDeclarator {
sender.clone(),
&config,
mempool.clone(),
// each downstream has its own sender (multi producer single consumer)
sender_add_txs_to_mempool.clone(),
)));

JobDeclaratorDownstream::start(
Expand Down
Loading

0 comments on commit 95fd8fc

Please sign in to comment.