diff --git a/roles/pool/src/lib/mining_pool/mod.rs b/roles/pool/src/lib/mining_pool/mod.rs index 0db3f8a5f0..b877fbc146 100644 --- a/roles/pool/src/lib/mining_pool/mod.rs +++ b/roles/pool/src/lib/mining_pool/mod.rs @@ -66,6 +66,15 @@ pub struct CoinbaseOutput { output_script_value: String, } +impl CoinbaseOutput { + pub fn new(output_script_type: String, output_script_value: String) -> Self { + Self { + output_script_type, + output_script_value, + } + } +} + impl TryFrom<&CoinbaseOutput> for CoinbaseOutput_ { type Error = Error; @@ -96,6 +105,33 @@ pub struct Configuration { pub test_only_listen_adress_plain: String, } +impl Configuration { + pub fn new( + listen_address: String, + tp_address: String, + tp_authority_public_key: Option, + authority_public_key: Secp256k1PublicKey, + authority_secret_key: Secp256k1SecretKey, + cert_validity_sec: u64, + coinbase_outputs: Vec, + pool_signature: String, + #[cfg(feature = "test_only_allow_unencrypted")] test_only_listen_adress_plain: String, + ) -> Self { + Self { + listen_address, + tp_address, + tp_authority_public_key, + authority_public_key, + authority_secret_key, + cert_validity_sec, + coinbase_outputs, + pool_signature, + #[cfg(feature = "test_only_allow_unencrypted")] + test_only_listen_adress_plain, + } + } +} + #[derive(Debug)] pub struct Downstream { // Either group or channel id diff --git a/roles/pool/src/lib/mod.rs b/roles/pool/src/lib/mod.rs index 2d8417842d..abf57eb477 100644 --- a/roles/pool/src/lib/mod.rs +++ b/roles/pool/src/lib/mod.rs @@ -2,3 +2,105 @@ pub mod error; pub mod mining_pool; pub mod status; pub mod template_receiver; + +use async_channel::{bounded, unbounded}; + +use mining_pool::{get_coinbase_output, Configuration, Pool}; +use template_receiver::TemplateRx; +use tracing::{error, info, warn}; + +use tokio::select; + +pub struct PoolSv2; + +impl PoolSv2 { + pub async fn start(config: Configuration) { + let (status_tx, status_rx) = unbounded(); + let (s_new_t, r_new_t) = bounded(10); + let (s_prev_hash, r_prev_hash) = bounded(10); + let (s_solution, r_solution) = bounded(10); + let (s_message_recv_signal, r_message_recv_signal) = bounded(10); + let coinbase_output_result = get_coinbase_output(&config); + let coinbase_output_len = match coinbase_output_result { + Ok(coinbase_output) => coinbase_output.len() as u32, + Err(err) => { + error!("Failed to get Coinbase output: {:?}", err); + return; + } + }; + let tp_authority_public_key = config.tp_authority_public_key; + let template_rx_res = TemplateRx::connect( + config.tp_address.parse().unwrap(), + s_new_t, + s_prev_hash, + r_solution, + r_message_recv_signal, + status::Sender::Upstream(status_tx.clone()), + coinbase_output_len, + tp_authority_public_key, + ) + .await; + + if let Err(e) = template_rx_res { + error!("Could not connect to Template Provider: {}", e); + return; + } + + let pool = Pool::start( + config.clone(), + r_new_t, + r_prev_hash, + s_solution, + s_message_recv_signal, + status::Sender::DownstreamListener(status_tx), + ); + + // Start the error handling loop + // See `./status.rs` and `utils/error_handling` for information on how this operates + loop { + let task_status = select! { + task_status = status_rx.recv() => task_status, + interrupt_signal = tokio::signal::ctrl_c() => { + match interrupt_signal { + Ok(()) => { + info!("Interrupt received"); + }, + Err(err) => { + error!("Unable to listen for interrupt signal: {}", err); + // we also shut down in case of error + }, + } + break; + } + }; + let task_status: status::Status = task_status.unwrap(); + + match task_status.state { + // Should only be sent by the downstream listener + status::State::DownstreamShutdown(err) => { + error!( + "SHUTDOWN from Downstream: {}\nTry to restart the downstream listener", + err + ); + break; + } + status::State::TemplateProviderShutdown(err) => { + error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err); + break; + } + status::State::Healthy(msg) => { + info!("HEALTHY message: {}", msg); + } + status::State::DownstreamInstanceDropped(downstream_id) => { + warn!("Dropping downstream instance {} from pool", downstream_id); + if pool + .safe_lock(|p| p.remove_downstream(downstream_id)) + .is_err() + { + break; + } + } + } + } + } +} diff --git a/roles/pool/src/main.rs b/roles/pool/src/main.rs index 55d6e117a6..683b18e2cd 100644 --- a/roles/pool/src/main.rs +++ b/roles/pool/src/main.rs @@ -1,16 +1,10 @@ #![allow(special_module_name)] -use async_channel::{bounded, unbounded}; -use tracing::{error, info, warn}; mod lib; -use lib::{ - mining_pool::{get_coinbase_output, Configuration, Pool}, - status, - template_receiver::TemplateRx, -}; - use ext_config::{Config, File, FileFormat}; -use tokio::select; +pub use lib::status; +use lib::{mining_pool::Configuration, PoolSv2}; +use tracing::error; mod args { use std::path::PathBuf; @@ -106,93 +100,5 @@ async fn main() { return; } }; - - let (status_tx, status_rx) = unbounded(); - let (s_new_t, r_new_t) = bounded(10); - let (s_prev_hash, r_prev_hash) = bounded(10); - let (s_solution, r_solution) = bounded(10); - let (s_message_recv_signal, r_message_recv_signal) = bounded(10); - info!("Pool INITIALIZING with config: {:?}", &args.config_path); - let coinbase_output_result = get_coinbase_output(&config); - let coinbase_output_len = match coinbase_output_result { - Ok(coinbase_output) => coinbase_output.len() as u32, - Err(err) => { - error!("Failed to get coinbase output: {:?}", err); - return; - } - }; - let tp_authority_public_key = config.tp_authority_public_key; - let template_rx_res = TemplateRx::connect( - config.tp_address.parse().unwrap(), - s_new_t, - s_prev_hash, - r_solution, - r_message_recv_signal, - status::Sender::Upstream(status_tx.clone()), - coinbase_output_len, - tp_authority_public_key, - ) - .await; - - if let Err(e) = template_rx_res { - error!("Could not connect to Template Provider: {}", e); - return; - } - - let pool = Pool::start( - config.clone(), - r_new_t, - r_prev_hash, - s_solution, - s_message_recv_signal, - status::Sender::DownstreamListener(status_tx), - ); - - // Start the error handling loop - // See `./status.rs` and `utils/error_handling` for information on how this operates - loop { - let task_status = select! { - task_status = status_rx.recv() => task_status, - interrupt_signal = tokio::signal::ctrl_c() => { - match interrupt_signal { - Ok(()) => { - info!("Interrupt received"); - }, - Err(err) => { - error!("Unable to listen for interrupt signal: {}", err); - // we also shut down in case of error - }, - } - break; - } - }; - let task_status: status::Status = task_status.unwrap(); - - match task_status.state { - // Should only be sent by the downstream listener - status::State::DownstreamShutdown(err) => { - error!( - "SHUTDOWN from Downstream: {}\nTry to restart the downstream listener", - err - ); - break; - } - status::State::TemplateProviderShutdown(err) => { - error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err); - break; - } - status::State::Healthy(msg) => { - info!("HEALTHY message: {}", msg); - } - status::State::DownstreamInstanceDropped(downstream_id) => { - warn!("Dropping downstream instance {} from pool", downstream_id); - if pool - .safe_lock(|p| p.remove_downstream(downstream_id)) - .is_err() - { - break; - } - } - } - } + PoolSv2::start(config).await; }