Skip to content

Commit

Permalink
review 2
Browse files Browse the repository at this point in the history
apply suggestions of the second round of review.
 - removed unnecessary Arc<Mutex<>>
 - in the handler implementation for the jds, when a submit_block is
   received, returns a SendTo::None(Some(SubmitSolution)). Accordingly
   changes on JobDeclaratorDownstream:::Start
 - close connection if a message SubmitSolution is sent but there is no
   job declared in the downstream
 - frunction roles-logic::utils::submit_solution_to_block is sobstituted
   with the implementation of From on a wrapper struct
 - removed catch-all arms in pattern match of
   handle_message_job_declaration in JobDeclaratorDownstream::start
 - removed explicit dependancy of library bytes, now relies on
   hyper::body::Bytes (that uses the same library under the hood, btw)
  • Loading branch information
lorbax committed Feb 23, 2024
1 parent 6671372 commit fd68321
Show file tree
Hide file tree
Showing 12 changed files with 2,087 additions and 75 deletions.
1,958 changes: 1,958 additions & 0 deletions benches/Cargo.lock

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions protocols/v2/roles-logic-sv2/src/handlers/job_declaration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ where
Ok(JobDeclaration::SubmitSolution(message)) => {
info!("Received SubmitSolution");
debug!("SubmitSolution: {:?}", message);
Self::handle_submit_solution(self_, message)
self_
.safe_lock(|x| x.handle_submit_solution(message))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?
}

Ok(_) => todo!(),
Expand All @@ -182,8 +184,5 @@ where
&mut self,
message: ProvideMissingTransactionsSuccess,
) -> Result<SendTo, Error>;
fn handle_submit_solution(
self_: Arc<Mutex<Self>>,
message: SubmitSolutionJd,
) -> Result<SendTo, Error>;
fn handle_submit_solution(&mut self, message: SubmitSolutionJd) -> Result<SendTo, Error>;
}
93 changes: 57 additions & 36 deletions protocols/v2/roles-logic-sv2/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,46 +741,67 @@ fn tx_hash_list_hash_builder(txid_list: Vec<bitcoin::Txid>) -> U256<'static> {
hash.to_vec().try_into().unwrap()
}

pub fn submit_solution_to_block(
last_declare: DeclareMiningJob,
mut tx_list: Vec<bitcoin::Transaction>,
message: SubmitSolutionJd,
) -> bitcoin::Block {
let coinbase_pre = last_declare.coinbase_prefix.to_vec();
let extranonce = message.extranonce.to_vec();
let coinbase_suf = last_declare.coinbase_suffix.to_vec();
let mut path: Vec<Vec<u8>> = vec![];
for tx in &tx_list {
let id = tx.txid();
let id = id.as_ref().to_vec();
path.push(id);
pub struct BlockCreator<'a> {
last_declare: DeclareMiningJob<'a>,
tx_list: Vec<bitcoin::Transaction>,
message: SubmitSolutionJd<'a>,
}
impl<'a> BlockCreator<'a> {
pub fn new(
last_declare: DeclareMiningJob<'a>,
tx_list: Vec<bitcoin::Transaction>,
message: SubmitSolutionJd<'a>,
) -> BlockCreator<'a> {
BlockCreator {
last_declare,
tx_list,
message,
}
}
let merkle_root =
merkle_root_from_path(&coinbase_pre[..], &coinbase_suf[..], &extranonce[..], &path)
.expect("Invalid coinbase");
let merkle_root = Hash::from_inner(merkle_root.try_into().unwrap());

let prev_blockhash = u256_to_block_hash(message.prev_hash.into_static());
let header = stratum_common::bitcoin::blockdata::block::BlockHeader {
version: last_declare.version as i32,
prev_blockhash,
merkle_root,
time: message.ntime,
bits: message.nbits,
nonce: message.nonce,
};
}

impl<'a> From<BlockCreator<'a>> for bitcoin::Block {
fn from(block_creator: BlockCreator<'a>) -> bitcoin::Block {
let last_declare = block_creator.last_declare;
let mut tx_list = block_creator.tx_list;
let message = block_creator.message;

let coinbase_pre = last_declare.coinbase_prefix.to_vec();
let extranonce = message.extranonce.to_vec();
let coinbase_suf = last_declare.coinbase_suffix.to_vec();
let mut path: Vec<Vec<u8>> = vec![];
for tx in &tx_list {
let id = tx.txid();
let id = id.as_ref().to_vec();
path.push(id);
}
let merkle_root =
merkle_root_from_path(&coinbase_pre[..], &coinbase_suf[..], &extranonce[..], &path)
.expect("Invalid coinbase");
let merkle_root = Hash::from_inner(merkle_root.try_into().unwrap());

let prev_blockhash = u256_to_block_hash(message.prev_hash.into_static());
let header = stratum_common::bitcoin::blockdata::block::BlockHeader {
version: last_declare.version as i32,
prev_blockhash,
merkle_root,
time: message.ntime,
bits: message.nbits,
nonce: message.nonce,
};

let coinbase = [coinbase_pre, extranonce, coinbase_suf].concat();
let coinbase = Transaction::deserialize(&coinbase[..]).unwrap();
tx_list.insert(0, coinbase);
let coinbase = [coinbase_pre, extranonce, coinbase_suf].concat();
let coinbase = Transaction::deserialize(&coinbase[..]).unwrap();
tx_list.insert(0, coinbase);

let mut block = Block {
header,
txdata: tx_list.clone(),
};
let mut block = Block {
header,
txdata: tx_list.clone(),
};

block.header.merkle_root = block.compute_merkle_root().unwrap();
block
block.header.merkle_root = block.compute_merkle_root().unwrap();
block
}
}

#[cfg(test)]
Expand Down
1 change: 0 additions & 1 deletion roles/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions roles/jd-server/src/lib/job_declarator/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use roles_logic_sv2::{
ProvideMissingTransactions, ProvideMissingTransactionsSuccess, SubmitSolutionJd,
},
parsers::JobDeclaration,
utils::Mutex,
};
use std::{convert::TryInto, io::Cursor, sync::Arc};
use std::{convert::TryInto, io::Cursor};
use stratum_common::bitcoin::Transaction;
pub type SendTo = SendTo_<JobDeclaration<'static>, ()>;
use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages};
Expand Down Expand Up @@ -177,12 +176,9 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
}
}

fn handle_submit_solution(
_self: Arc<Mutex<Self>>,
message: SubmitSolutionJd<'_>,
) -> Result<SendTo, Error> {
fn handle_submit_solution(&mut self, message: SubmitSolutionJd<'_>) -> Result<SendTo, Error> {
let m = JobDeclaration::SubmitSolution(message.clone().into_static());

Ok(SendTo::RelayNewMessage(m))
Ok(SendTo::None(Some(m)))
}
}
81 changes: 61 additions & 20 deletions roles/jd-server/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use roles_logic_sv2::{
use secp256k1::{Keypair, Message as SecpMessage, Secp256k1};
use std::{collections::HashMap, convert::TryInto, sync::Arc};
use tokio::net::TcpListener;
use tracing::{error, info, warn};
use tracing::{error, info};

use stratum_common::bitcoin::{
consensus::{encode::serialize, Encodable},
Expand Down Expand Up @@ -70,22 +70,18 @@ impl JobDeclaratorDownstream {
}
}

fn get_block_hex(self_mutex: Arc<Mutex<Self>>, message: SubmitSolutionJd) -> String {
fn get_block_hex(self_mutex: Arc<Mutex<Self>>, message: SubmitSolutionJd) -> Option<String> {
//TODO: implement logic for success or error
let (last_declare, tx_list, _) = match self_mutex
.safe_lock(|x| x.declared_mining_job.take())
.unwrap()
{
Some((last_declare, tx_list, _x)) => (last_declare, tx_list, _x),
None => {
warn!("Received solution but no job available");
todo!()
}
None => return None,
};
let block: Block =
roles_logic_sv2::utils::submit_solution_to_block(last_declare, tx_list, message);
let serialized_block = serialize(&block);
hex::encode(serialized_block)
roles_logic_sv2::utils::BlockCreator::new(last_declare, tx_list, message).into();
Some(hex::encode(serialize(&block)))
}

pub async fn send(
Expand Down Expand Up @@ -124,17 +120,63 @@ impl JobDeclaratorDownstream {
Ok(SendTo::Respond(message)) => {
Self::send(self_mutex.clone(), message).await.unwrap();
}
Ok(SendTo::None(_)) => (),
Ok(SendTo::RelayNewMessage(JobDeclaration::SubmitSolution(
message,
))) => {
let hexdata = JobDeclaratorDownstream::get_block_hex(
self_mutex.clone(),
message,
);

let _ = submit_solution_sender.send(hexdata).await;
Ok(SendTo::RelayNewMessage(message)) => {
error!("JD Server: unexpected relay new message {:?}", message);
}
Ok(SendTo::RelayNewMessageToRemote(remote, message)) => {
error!("JD Server: unexpected relay new message to remote. Remote: {:?}, Message: {:?}", remote, message);
}
Ok(SendTo::RelaySameMessageToRemote(remote)) => {
error!("JD Server: unexpected relay same message to remote. Remote: {:?}", remote);
}
Ok(SendTo::Multiple(multiple)) => {
error!("JD Server: unexpected multiple messages: {:?}", multiple);
}
Ok(SendTo::None(m)) => match m {
Some(JobDeclaration::SubmitSolution(message)) => {
let hexdata = match JobDeclaratorDownstream::get_block_hex(
self_mutex.clone(),
message,
) {
Some(inner) => inner,
None => {
error!("Received solution but no job available");
recv.close();
break;
}
};

let _ = submit_solution_sender.send(hexdata).await;
}
Some(JobDeclaration::DeclareMiningJob(_)) => {
error!("JD Server received an unexpected message {:?}", m);
}
Some(JobDeclaration::DeclareMiningJobSuccess(_)) => {
error!("JD Server received an unexpected message {:?}", m);
}
Some(JobDeclaration::DeclareMiningJobError(_)) => {
error!("JD Server received an unexpected message {:?}", m);
}
Some(JobDeclaration::IdentifyTransactions(_)) => {
error!("JD Server received an unexpected message {:?}", m);
}
Some(JobDeclaration::IdentifyTransactionsSuccess(_)) => {
error!("JD Server received an unexpected message {:?}", m);
}
Some(JobDeclaration::AllocateMiningJobToken(_)) => {
error!("JD Server received an unexpected message {:?}", m);
}
Some(JobDeclaration::AllocateMiningJobTokenSuccess(_)) => {
error!("JD Server received an unexpected message {:?}", m);
}
Some(JobDeclaration::ProvideMissingTransactions(_)) => {
error!("JD Server received an unexpected message {:?}", m);
}
Some(JobDeclaration::ProvideMissingTransactionsSuccess(_)) => {
error!("JD Server received an unexpected message {:?}", m);
}
None => (),
},
Err(e) => {
error!("{:?}", e);
handle_result!(
Expand All @@ -144,7 +186,6 @@ impl JobDeclaratorDownstream {
recv.close();
break;
}
_ => unreachable!(),
}
}
Err(err) => {
Expand Down
2 changes: 0 additions & 2 deletions roles/jd-server/src/lib/mempool/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use rpc::mini_rpc_client::RpcError;
use tokio::task::JoinError;
use tracing::{error, warn};

// TODO this should be includede in JdsError

#[derive(Debug)]
pub enum JdsMempoolError {
EmptyMempool,
Expand Down
1 change: 1 addition & 0 deletions roles/jd-server/src/lib/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl JDsMempool {

/// This function is used only for debug purposes and should not be used
/// in production code.
#[cfg(debug_assertions)]
pub fn _get_transaction_list(self_: Arc<Mutex<Self>>) -> Vec<Txid> {
let tx_list = self_.safe_lock(|x| x.mempool.clone()).unwrap();
let tx_list_: Vec<Txid> = tx_list.iter().map(|n| n.id).collect();
Expand Down
2 changes: 2 additions & 0 deletions roles/jd-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ async fn main() {
}
}
tokio::time::sleep(mempool_update_interval).await;
// DO NOT REMOVE THIS LINE
//let _transactions = mempool::JDsMempool::_get_transaction_list(mempool_cloned_.clone());
}
});
};
Expand Down
1 change: 0 additions & 1 deletion utils/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion utils/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@ base64 = "0.21.5"
hyper = { version = "1.1.0", features = ["full"] }
hyper-util = { version = "0.1", features = ["full"] }
http-body-util = "0.1"
bytes = "1"
3 changes: 1 addition & 2 deletions utils/rpc/src/mini_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
// - use https for security reasons
// - manage id in RpcResult messages
use base64::Engine;
use bytes::Bytes;
use hex::decode;
use http_body_util::{BodyExt, Full};
use hyper::{
body::Bytes,
header::{AUTHORIZATION, CONTENT_TYPE},
Request,
};
Expand All @@ -24,7 +24,6 @@ use super::BlockHash;
#[derive(Clone, Debug)]
pub struct MiniRpcClient {
client: Client<HttpConnector, Full<Bytes>>,
//url: &'a str,
url: String,
auth: Auth,
}
Expand Down

0 comments on commit fd68321

Please sign in to comment.