Skip to content

Commit 8158398

Browse files
committed
Move code out of main.rs to lib
..This would allow us to use the crate in other enviornements.
1 parent 0f0ee1e commit 8158398

File tree

3 files changed

+187
-98
lines changed

3 files changed

+187
-98
lines changed

roles/pool/src/lib/mining_pool/mod.rs

+76
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,15 @@ pub struct CoinbaseOutput {
6666
output_script_value: String,
6767
}
6868

69+
impl CoinbaseOutput {
70+
pub fn new(output_script_type: String, output_script_value: String) -> Self {
71+
Self {
72+
output_script_type,
73+
output_script_value,
74+
}
75+
}
76+
}
77+
6978
impl TryFrom<&CoinbaseOutput> for CoinbaseOutput_ {
7079
type Error = Error;
7180

@@ -96,6 +105,73 @@ pub struct Configuration {
96105
pub test_only_listen_adress_plain: String,
97106
}
98107

108+
pub struct TemplateProviderConfig {
109+
address: String,
110+
authority_public_key: Option<Secp256k1PublicKey>,
111+
}
112+
113+
impl TemplateProviderConfig {
114+
pub fn new(address: String, authority_public_key: Option<Secp256k1PublicKey>) -> Self {
115+
Self {
116+
address,
117+
authority_public_key,
118+
}
119+
}
120+
}
121+
122+
pub struct AuthorityConfig {
123+
pub public_key: Secp256k1PublicKey,
124+
pub secret_key: Secp256k1SecretKey,
125+
}
126+
127+
impl AuthorityConfig {
128+
pub fn new(public_key: Secp256k1PublicKey, secret_key: Secp256k1SecretKey) -> Self {
129+
Self {
130+
public_key,
131+
secret_key,
132+
}
133+
}
134+
}
135+
136+
pub struct ConnectionConfig {
137+
listen_address: String,
138+
cert_validity_sec: u64,
139+
signature: String,
140+
}
141+
142+
impl ConnectionConfig {
143+
pub fn new(listen_address: String, cert_validity_sec: u64, signature: String) -> Self {
144+
Self {
145+
listen_address,
146+
cert_validity_sec,
147+
signature,
148+
}
149+
}
150+
}
151+
152+
impl Configuration {
153+
pub fn new(
154+
pool_connection: ConnectionConfig,
155+
template_provider: TemplateProviderConfig,
156+
authority_config: AuthorityConfig,
157+
coinbase_outputs: Vec<CoinbaseOutput>,
158+
#[cfg(feature = "test_only_allow_unencrypted")] test_only_listen_adress_plain: String,
159+
) -> Self {
160+
Self {
161+
listen_address: pool_connection.listen_address,
162+
tp_address: template_provider.address,
163+
tp_authority_public_key: template_provider.authority_public_key,
164+
authority_public_key: authority_config.public_key,
165+
authority_secret_key: authority_config.secret_key,
166+
cert_validity_sec: pool_connection.cert_validity_sec,
167+
coinbase_outputs,
168+
pool_signature: pool_connection.signature,
169+
#[cfg(feature = "test_only_allow_unencrypted")]
170+
test_only_listen_adress_plain,
171+
}
172+
}
173+
}
174+
99175
#[derive(Debug)]
100176
pub struct Downstream {
101177
// Either group or channel id

roles/pool/src/lib/mod.rs

+108
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,111 @@ pub mod error;
22
pub mod mining_pool;
33
pub mod status;
44
pub mod template_receiver;
5+
6+
use async_channel::{bounded, unbounded};
7+
8+
use mining_pool::{get_coinbase_output, Configuration, Pool};
9+
use template_receiver::TemplateRx;
10+
use tracing::{error, info, warn};
11+
12+
use tokio::select;
13+
14+
pub struct PoolSv2 {
15+
config: Configuration,
16+
}
17+
18+
impl PoolSv2 {
19+
pub fn new(config: Configuration) -> PoolSv2 {
20+
PoolSv2 { config }
21+
}
22+
pub async fn start(self) {
23+
let config = self.config.clone();
24+
let (status_tx, status_rx) = unbounded();
25+
let (s_new_t, r_new_t) = bounded(10);
26+
let (s_prev_hash, r_prev_hash) = bounded(10);
27+
let (s_solution, r_solution) = bounded(10);
28+
let (s_message_recv_signal, r_message_recv_signal) = bounded(10);
29+
let coinbase_output_result = get_coinbase_output(&config);
30+
let coinbase_output_len = match coinbase_output_result {
31+
Ok(coinbase_output) => coinbase_output.len() as u32,
32+
Err(err) => {
33+
error!("Failed to get Coinbase output: {:?}", err);
34+
return;
35+
}
36+
};
37+
let tp_authority_public_key = config.tp_authority_public_key;
38+
let template_rx_res = TemplateRx::connect(
39+
config.tp_address.parse().unwrap(),
40+
s_new_t,
41+
s_prev_hash,
42+
r_solution,
43+
r_message_recv_signal,
44+
status::Sender::Upstream(status_tx.clone()),
45+
coinbase_output_len,
46+
tp_authority_public_key,
47+
)
48+
.await;
49+
50+
if let Err(e) = template_rx_res {
51+
error!("Could not connect to Template Provider: {}", e);
52+
return;
53+
}
54+
55+
let pool = Pool::start(
56+
config.clone(),
57+
r_new_t,
58+
r_prev_hash,
59+
s_solution,
60+
s_message_recv_signal,
61+
status::Sender::DownstreamListener(status_tx),
62+
);
63+
64+
// Start the error handling loop
65+
// See `./status.rs` and `utils/error_handling` for information on how this operates
66+
loop {
67+
let task_status = select! {
68+
task_status = status_rx.recv() => task_status,
69+
interrupt_signal = tokio::signal::ctrl_c() => {
70+
match interrupt_signal {
71+
Ok(()) => {
72+
info!("Interrupt received");
73+
},
74+
Err(err) => {
75+
error!("Unable to listen for interrupt signal: {}", err);
76+
// we also shut down in case of error
77+
},
78+
}
79+
break;
80+
}
81+
};
82+
let task_status: status::Status = task_status.unwrap();
83+
84+
match task_status.state {
85+
// Should only be sent by the downstream listener
86+
status::State::DownstreamShutdown(err) => {
87+
error!(
88+
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
89+
err
90+
);
91+
break;
92+
}
93+
status::State::TemplateProviderShutdown(err) => {
94+
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
95+
break;
96+
}
97+
status::State::Healthy(msg) => {
98+
info!("HEALTHY message: {}", msg);
99+
}
100+
status::State::DownstreamInstanceDropped(downstream_id) => {
101+
warn!("Dropping downstream instance {} from pool", downstream_id);
102+
if pool
103+
.safe_lock(|p| p.remove_downstream(downstream_id))
104+
.is_err()
105+
{
106+
break;
107+
}
108+
}
109+
}
110+
}
111+
}
112+
}

roles/pool/src/main.rs

+3-98
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,9 @@
11
#![allow(special_module_name)]
2-
use async_channel::{bounded, unbounded};
32

4-
use tracing::{error, info, warn};
53
mod lib;
6-
use lib::{
7-
mining_pool::{get_coinbase_output, Configuration, Pool},
8-
status,
9-
template_receiver::TemplateRx,
10-
};
11-
124
use ext_config::{Config, File, FileFormat};
13-
use tokio::select;
5+
pub use lib::{mining_pool::Configuration, status, PoolSv2};
6+
use tracing::error;
147

158
mod args {
169
use std::path::PathBuf;
@@ -106,93 +99,5 @@ async fn main() {
10699
return;
107100
}
108101
};
109-
110-
let (status_tx, status_rx) = unbounded();
111-
let (s_new_t, r_new_t) = bounded(10);
112-
let (s_prev_hash, r_prev_hash) = bounded(10);
113-
let (s_solution, r_solution) = bounded(10);
114-
let (s_message_recv_signal, r_message_recv_signal) = bounded(10);
115-
info!("Pool INITIALIZING with config: {:?}", &args.config_path);
116-
let coinbase_output_result = get_coinbase_output(&config);
117-
let coinbase_output_len = match coinbase_output_result {
118-
Ok(coinbase_output) => coinbase_output.len() as u32,
119-
Err(err) => {
120-
error!("Failed to get coinbase output: {:?}", err);
121-
return;
122-
}
123-
};
124-
let tp_authority_public_key = config.tp_authority_public_key;
125-
let template_rx_res = TemplateRx::connect(
126-
config.tp_address.parse().unwrap(),
127-
s_new_t,
128-
s_prev_hash,
129-
r_solution,
130-
r_message_recv_signal,
131-
status::Sender::Upstream(status_tx.clone()),
132-
coinbase_output_len,
133-
tp_authority_public_key,
134-
)
135-
.await;
136-
137-
if let Err(e) = template_rx_res {
138-
error!("Could not connect to Template Provider: {}", e);
139-
return;
140-
}
141-
142-
let pool = Pool::start(
143-
config.clone(),
144-
r_new_t,
145-
r_prev_hash,
146-
s_solution,
147-
s_message_recv_signal,
148-
status::Sender::DownstreamListener(status_tx),
149-
);
150-
151-
// Start the error handling loop
152-
// See `./status.rs` and `utils/error_handling` for information on how this operates
153-
loop {
154-
let task_status = select! {
155-
task_status = status_rx.recv() => task_status,
156-
interrupt_signal = tokio::signal::ctrl_c() => {
157-
match interrupt_signal {
158-
Ok(()) => {
159-
info!("Interrupt received");
160-
},
161-
Err(err) => {
162-
error!("Unable to listen for interrupt signal: {}", err);
163-
// we also shut down in case of error
164-
},
165-
}
166-
break;
167-
}
168-
};
169-
let task_status: status::Status = task_status.unwrap();
170-
171-
match task_status.state {
172-
// Should only be sent by the downstream listener
173-
status::State::DownstreamShutdown(err) => {
174-
error!(
175-
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
176-
err
177-
);
178-
break;
179-
}
180-
status::State::TemplateProviderShutdown(err) => {
181-
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
182-
break;
183-
}
184-
status::State::Healthy(msg) => {
185-
info!("HEALTHY message: {}", msg);
186-
}
187-
status::State::DownstreamInstanceDropped(downstream_id) => {
188-
warn!("Dropping downstream instance {} from pool", downstream_id);
189-
if pool
190-
.safe_lock(|p| p.remove_downstream(downstream_id))
191-
.is_err()
192-
{
193-
break;
194-
}
195-
}
196-
}
197-
}
102+
PoolSv2::new(config).start().await;
198103
}

0 commit comments

Comments
 (0)