Skip to content

Commit

Permalink
fix message generator tests
Browse files Browse the repository at this point in the history
Since the task that listen for new blocks is now performed
independently in main, if the url rpc config is the jds is left empty
the mempool the method `get_client` called in `JDsMemppol::on_submit()`
returns `NoClient` error. Since we do not want rpc connections in
message generator, the url config is left empty, resulting in a lot of
`NoCLient` errors in the JD-server, which led the MG tests involving
jd-server to fail.

Also changed some variables name to make them more intuitive
  • Loading branch information
lorbax committed Mar 4, 2024
1 parent 0a99214 commit b49fc6d
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 36 deletions.
3 changes: 3 additions & 0 deletions protocols/v2/roles-logic-sv2/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,9 @@ impl<'a> BlockCreator<'a> {
}
}

/// TODO write a test for this function that takes an already mined block, and test if the new
/// block created with the hash of the new block created with the block creator coincides with the
/// hash of the mined block
impl<'a> From<BlockCreator<'a>> for bitcoin::Block {
fn from(block_creator: BlockCreator<'a>) -> bitcoin::Block {
let last_declare = block_creator.last_declare;
Expand Down
13 changes: 6 additions & 7 deletions roles/jd-server/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl JobDeclaratorDownstream {
pub fn start(
self_mutex: Arc<Mutex<Self>>,
tx_status: status::Sender,
submit_solution_sender: Sender<String>,
new_block_sender: Sender<String>,
) {
let recv = self_mutex.safe_lock(|s| s.receiver.clone()).unwrap();
tokio::spawn(async move {
Expand Down Expand Up @@ -146,7 +146,7 @@ impl JobDeclaratorDownstream {
}
};

let _ = submit_solution_sender.send(hexdata).await;
let _ = new_block_sender.send(hexdata).await;
}
Some(JobDeclaration::DeclareMiningJob(_)) => {
error!("JD Server received an unexpected message {:?}", m);
Expand Down Expand Up @@ -229,19 +229,18 @@ impl JobDeclarator {
config: Configuration,
status_tx: crate::status::Sender,
mempool: Arc<Mutex<JDsMempool>>,
submit_solution_sender: Sender<String>,
new_block_sender: Sender<String>,
) {
let self_ = Arc::new(Mutex::new(Self {}));
info!("JD INITIALIZED");
Self::accept_incoming_connection(self_, config, status_tx, mempool, submit_solution_sender)
.await;
Self::accept_incoming_connection(self_, config, status_tx, mempool, new_block_sender).await;
}
async fn accept_incoming_connection(
_self_: Arc<Mutex<JobDeclarator>>,
config: Configuration,
status_tx: crate::status::Sender,
mempool: Arc<Mutex<JDsMempool>>,
submit_solution_sender: Sender<String>,
new_block_sender: Sender<String>,
) {
let listner = TcpListener::bind(&config.listen_jd_address).await.unwrap();
while let Ok((stream, _)) = listner.accept().await {
Expand Down Expand Up @@ -285,7 +284,7 @@ impl JobDeclarator {
JobDeclaratorDownstream::start(
jddownstream,
status_tx.clone(),
submit_solution_sender.clone(),
new_block_sender.clone(),
);
} else {
error!("Can not connect {:?}", addr);
Expand Down
12 changes: 6 additions & 6 deletions roles/jd-server/src/lib/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct JDsMempool {
pub mempool: Vec<TransacrtionWithHash>,
auth: mini_rpc_client::Auth,
url: String,
receiver: Receiver<String>,
new_block_receiver: Receiver<String>,
}

impl JDsMempool {
Expand All @@ -45,15 +45,15 @@ impl JDsMempool {
url: String,
username: String,
password: String,
receiver: Receiver<String>,
new_block_receiver: Receiver<String>,
) -> Self {
let auth = mini_rpc_client::Auth::new(username, password);
let empty_mempool: Vec<TransacrtionWithHash> = Vec::new();
JDsMempool {
mempool: empty_mempool,
auth,
url,
receiver,
new_block_receiver,
}
}

Expand Down Expand Up @@ -97,15 +97,15 @@ impl JDsMempool {
}

pub async fn on_submit(self_: Arc<Mutex<Self>>) -> Result<(), JdsMempoolError> {
let receiver: Receiver<String> = self_
.safe_lock(|x| x.receiver.clone())
let new_block_receiver: Receiver<String> = self_
.safe_lock(|x| x.new_block_receiver.clone())
.map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))?;
let client = self_
.safe_lock(|x| x.get_client())
.map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))?
.ok_or(JdsMempoolError::NoClient)?;

while let Ok(block_hex) = receiver.recv().await {
while let Ok(block_hex) = new_block_receiver.recv().await {
match mini_rpc_client::MiniRpcClient::submit_block(&client, block_hex).await {
Ok(_) => return Ok(()),
Err(e) => JdsMempoolError::Rpc(e),
Expand Down
49 changes: 26 additions & 23 deletions roles/jd-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,12 @@ async fn main() {
let username = config.core_rpc_user.clone();
let password = config.core_rpc_pass.clone();
// TODO should we manage what to do when the limit is reaced?
let (submit_solution_sender, submit_solution_receiver): (Sender<String>, Receiver<String>) =
bounded(10);
let (new_block_sender, new_block_receiver): (Sender<String>, Receiver<String>) = bounded(10);
let mempool = Arc::new(Mutex::new(mempool::JDsMempool::new(
url.clone(),
username,
password,
submit_solution_receiver,
new_block_receiver,
)));
let mempool_update_interval = config.mempool_update_interval;
let mempool_cloned_ = mempool.clone();
Expand All @@ -121,6 +120,9 @@ async fn main() {
let mut last_empty_mempool_warning =
std::time::Instant::now().sub(std::time::Duration::from_secs(60));

// TODO if the jd-server is launched with core_rpc_url empty, the following flow is never
// taken. Consequentally new_block_receiver in JDsMempool::on_submit is never read, possibly
// reaching the channel bound. The new_block_sender is given as input to JobDeclarator::start()
if url.contains("http") {
let sender_update_mempool = sender.clone();
task::spawn(async move {
Expand Down Expand Up @@ -159,36 +161,37 @@ async fn main() {
//let _transactions = mempool::JDsMempool::_get_transaction_list(mempool_cloned_.clone());
}
});
};
let mempool_cloned = mempool.clone();
let sender_submit_solution = sender.clone();
task::spawn(async move {
loop {
let result = mempool::JDsMempool::on_submit(mempool_cloned.clone()).await;
if let Err(err) = result {
match err {
JdsMempoolError::EmptyMempool => {
if last_empty_mempool_warning.elapsed().as_secs() >= 60 {
warn!("{:?}", err);
warn!("Template Provider is running, but its mempool is empty (possible reasons: you're testing in testnet, signet, or regtest)");
last_empty_mempool_warning = std::time::Instant::now();

let mempool_cloned = mempool.clone();
let sender_submit_solution = sender.clone();
task::spawn(async move {
loop {
let result = mempool::JDsMempool::on_submit(mempool_cloned.clone()).await;
if let Err(err) = result {
match err {
JdsMempoolError::EmptyMempool => {
if last_empty_mempool_warning.elapsed().as_secs() >= 60 {
warn!("{:?}", err);
warn!("Template Provider is running, but its mempool is empty (possible reasons: you're testing in testnet, signet, or regtest)");
last_empty_mempool_warning = std::time::Instant::now();
}
}
_ => {
mempool::error::handle_error(&err);
handle_result!(sender_submit_solution, Err(err));
}
}
_ => {
mempool::error::handle_error(&err);
handle_result!(sender_submit_solution, Err(err));
}
}
}
}
});
});
};

info!("Jds INITIALIZING with config: {:?}", &args.config_path);

let cloned = config.clone();
let mempool_cloned = mempool.clone();
task::spawn(async move {
JobDeclarator::start(cloned, sender, mempool_cloned, submit_solution_sender).await
JobDeclarator::start(cloned, sender, mempool_cloned, new_block_sender).await
});

// Start the error handling loop
Expand Down

0 comments on commit b49fc6d

Please sign in to comment.