Skip to content

Commit

Permalink
Refactor PoolSv2
Browse files Browse the repository at this point in the history
Split library and binary code.
  • Loading branch information
jbesraa committed Aug 19, 2024
1 parent 69d9077 commit 6a9218c
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 98 deletions.
36 changes: 36 additions & 0 deletions roles/pool/src/lib/mining_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ pub struct CoinbaseOutput {
output_script_value: String,
}

impl CoinbaseOutput {
pub fn new(output_script_type: String, output_script_value: String) -> Self {
Self {
output_script_type,
output_script_value,
}
}
}

impl TryFrom<&CoinbaseOutput> for CoinbaseOutput_ {
type Error = Error;

Expand Down Expand Up @@ -96,6 +105,33 @@ pub struct Configuration {
pub test_only_listen_adress_plain: String,
}

impl Configuration {
pub fn new(
listen_address: String,
tp_address: String,
tp_authority_public_key: Option<Secp256k1PublicKey>,
authority_public_key: Secp256k1PublicKey,
authority_secret_key: Secp256k1SecretKey,
cert_validity_sec: u64,
coinbase_outputs: Vec<CoinbaseOutput>,
pool_signature: String,
#[cfg(feature = "test_only_allow_unencrypted")] test_only_listen_adress_plain: String,
) -> Self {
Self {
listen_address,
tp_address,
tp_authority_public_key,
authority_public_key,
authority_secret_key,
cert_validity_sec,
coinbase_outputs,
pool_signature,
#[cfg(feature = "test_only_allow_unencrypted")]
test_only_listen_adress_plain,
}
}
}

#[derive(Debug)]
pub struct Downstream {
// Either group or channel id
Expand Down
102 changes: 102 additions & 0 deletions roles/pool/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,105 @@ pub mod error;
pub mod mining_pool;
pub mod status;
pub mod template_receiver;

use async_channel::{bounded, unbounded};

use mining_pool::{get_coinbase_output, Configuration, Pool};
use template_receiver::TemplateRx;
use tracing::{error, info, warn};

use tokio::select;

pub struct PoolSv2;

impl PoolSv2 {
pub async fn start(config: Configuration) {
let (status_tx, status_rx) = unbounded();
let (s_new_t, r_new_t) = bounded(10);
let (s_prev_hash, r_prev_hash) = bounded(10);
let (s_solution, r_solution) = bounded(10);
let (s_message_recv_signal, r_message_recv_signal) = bounded(10);
let coinbase_output_result = get_coinbase_output(&config);
let coinbase_output_len = match coinbase_output_result {
Ok(coinbase_output) => coinbase_output.len() as u32,
Err(err) => {
error!("Failed to get Coinbase output: {:?}", err);
return;
}
};
let tp_authority_public_key = config.tp_authority_public_key;
let template_rx_res = TemplateRx::connect(
config.tp_address.parse().unwrap(),
s_new_t,
s_prev_hash,
r_solution,
r_message_recv_signal,
status::Sender::Upstream(status_tx.clone()),
coinbase_output_len,
tp_authority_public_key,
)
.await;

if let Err(e) = template_rx_res {
error!("Could not connect to Template Provider: {}", e);
return;
}

let pool = Pool::start(
config.clone(),
r_new_t,
r_prev_hash,
s_solution,
s_message_recv_signal,
status::Sender::DownstreamListener(status_tx),
);

// Start the error handling loop
// See `./status.rs` and `utils/error_handling` for information on how this operates
loop {
let task_status = select! {
task_status = status_rx.recv() => task_status,
interrupt_signal = tokio::signal::ctrl_c() => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
// we also shut down in case of error
},
}
break;
}
};
let task_status: status::Status = task_status.unwrap();

match task_status.state {
// Should only be sent by the downstream listener
status::State::DownstreamShutdown(err) => {
error!(
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
err
);
break;
}
status::State::TemplateProviderShutdown(err) => {
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
break;
}
status::State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
status::State::DownstreamInstanceDropped(downstream_id) => {
warn!("Dropping downstream instance {} from pool", downstream_id);
if pool
.safe_lock(|p| p.remove_downstream(downstream_id))
.is_err()
{
break;
}
}
}
}
}
}
102 changes: 4 additions & 98 deletions roles/pool/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
#![allow(special_module_name)]
use async_channel::{bounded, unbounded};

use tracing::{error, info, warn};
mod lib;
use lib::{
mining_pool::{get_coinbase_output, Configuration, Pool},
status,
template_receiver::TemplateRx,
};

use ext_config::{Config, File, FileFormat};
use tokio::select;
pub use lib::status;
use lib::{mining_pool::Configuration, PoolSv2};
use tracing::error;

mod args {
use std::path::PathBuf;
Expand Down Expand Up @@ -106,93 +100,5 @@ async fn main() {
return;
}
};

let (status_tx, status_rx) = unbounded();
let (s_new_t, r_new_t) = bounded(10);
let (s_prev_hash, r_prev_hash) = bounded(10);
let (s_solution, r_solution) = bounded(10);
let (s_message_recv_signal, r_message_recv_signal) = bounded(10);
info!("Pool INITIALIZING with config: {:?}", &args.config_path);
let coinbase_output_result = get_coinbase_output(&config);
let coinbase_output_len = match coinbase_output_result {
Ok(coinbase_output) => coinbase_output.len() as u32,
Err(err) => {
error!("Failed to get coinbase output: {:?}", err);
return;
}
};
let tp_authority_public_key = config.tp_authority_public_key;
let template_rx_res = TemplateRx::connect(
config.tp_address.parse().unwrap(),
s_new_t,
s_prev_hash,
r_solution,
r_message_recv_signal,
status::Sender::Upstream(status_tx.clone()),
coinbase_output_len,
tp_authority_public_key,
)
.await;

if let Err(e) = template_rx_res {
error!("Could not connect to Template Provider: {}", e);
return;
}

let pool = Pool::start(
config.clone(),
r_new_t,
r_prev_hash,
s_solution,
s_message_recv_signal,
status::Sender::DownstreamListener(status_tx),
);

// Start the error handling loop
// See `./status.rs` and `utils/error_handling` for information on how this operates
loop {
let task_status = select! {
task_status = status_rx.recv() => task_status,
interrupt_signal = tokio::signal::ctrl_c() => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
// we also shut down in case of error
},
}
break;
}
};
let task_status: status::Status = task_status.unwrap();

match task_status.state {
// Should only be sent by the downstream listener
status::State::DownstreamShutdown(err) => {
error!(
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
err
);
break;
}
status::State::TemplateProviderShutdown(err) => {
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
break;
}
status::State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
status::State::DownstreamInstanceDropped(downstream_id) => {
warn!("Dropping downstream instance {} from pool", downstream_id);
if pool
.safe_lock(|p| p.remove_downstream(downstream_id))
.is_err()
{
break;
}
}
}
}
PoolSv2::start(config).await;
}

0 comments on commit 6a9218c

Please sign in to comment.