Skip to content

Commit 5d569bb

Browse files
jbesraaplebhash
authored andcommitted
Add Job Declarator Client struct
With the goal to write more tests, this commit moves JDC initialisation logic from `main.rs` to `lib.rs` so its easier to test and also separate the actual `lib` code from the binary.
1 parent 9d62c72 commit 5d569bb

File tree

2 files changed

+322
-311
lines changed

2 files changed

+322
-311
lines changed

roles/jd-client/src/lib/mod.rs

+317
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,22 @@ pub mod upstream_sv2;
88

99
use std::{sync::atomic::AtomicBool, time::Duration};
1010

11+
use job_declarator::JobDeclarator;
12+
use proxy_config::ProxyConfig;
13+
use template_receiver::TemplateRx;
14+
15+
use async_channel::{bounded, unbounded};
16+
use futures::{select, FutureExt};
17+
use roles_logic_sv2::utils::Mutex;
18+
use std::{
19+
net::{IpAddr, SocketAddr},
20+
str::FromStr,
21+
sync::Arc,
22+
};
23+
use tokio::task::AbortHandle;
24+
25+
use tracing::{error, info};
26+
1127
/// Is used by the template receiver and the downstream. When a NewTemplate is received the context
1228
/// that is running the template receiver set this value to false and then the message is sent to
1329
/// the context that is running the Downstream that do something and then set it back to true.
@@ -31,6 +47,307 @@ use std::{sync::atomic::AtomicBool, time::Duration};
3147
/// between all the contexts is not necessary.
3248
pub static IS_NEW_TEMPLATE_HANDLED: AtomicBool = AtomicBool::new(true);
3349

50+
/// Job Declarator Client (or JDC) is the role which is Miner-side, in charge of creating new
51+
/// mining jobs from the templates received by the Template Provider to which it is connected. It
52+
/// declares custom jobs to the JDS, in order to start working on them.
53+
/// JDC is also responsible for putting in action the Pool-fallback mechanism, automatically
54+
/// switching to backup Pools in case of declared custom jobs refused by JDS (which is Pool side).
55+
/// As a solution of last-resort, it is able to switch to Solo Mining until new safe Pools appear
56+
/// in the market.
57+
pub struct JobDeclaratorClient {
58+
/// Configuration of the proxy server [`JobDeclaratorClient`] is connected to.
59+
config: ProxyConfig,
60+
}
61+
62+
impl JobDeclaratorClient {
63+
pub fn new(config: ProxyConfig) -> Self {
64+
Self { config }
65+
}
66+
67+
pub async fn start(self) {
68+
let mut upstream_index = 0;
69+
let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse());
70+
71+
// Channel used to manage failed tasks
72+
let (tx_status, rx_status) = unbounded();
73+
74+
let task_collector = Arc::new(Mutex::new(vec![]));
75+
76+
let proxy_config = &self.config;
77+
78+
loop {
79+
let task_collector = task_collector.clone();
80+
let tx_status = tx_status.clone();
81+
if let Some(upstream) = proxy_config.upstreams.get(upstream_index) {
82+
self.initialize_jd(tx_status.clone(), task_collector.clone(), upstream.clone())
83+
.await;
84+
} else {
85+
self.initialize_jd_as_solo_miner(tx_status.clone(), task_collector.clone())
86+
.await;
87+
}
88+
// Check all tasks if is_finished() is true, if so exit
89+
loop {
90+
let task_status = select! {
91+
task_status = rx_status.recv().fuse() => task_status,
92+
interrupt_signal = interrupt_signal_future => {
93+
match interrupt_signal {
94+
Ok(()) => {
95+
info!("Interrupt received");
96+
},
97+
Err(err) => {
98+
error!("Unable to listen for interrupt signal: {}", err);
99+
// we also shut down in case of error
100+
},
101+
}
102+
std::process::exit(0);
103+
}
104+
};
105+
let task_status: status::Status = task_status.unwrap();
106+
107+
match task_status.state {
108+
// Should only be sent by the downstream listener
109+
status::State::DownstreamShutdown(err) => {
110+
error!("SHUTDOWN from: {}", err);
111+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
112+
task_collector
113+
.safe_lock(|s| {
114+
for handle in s {
115+
handle.abort();
116+
}
117+
})
118+
.unwrap();
119+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
120+
break;
121+
}
122+
status::State::UpstreamShutdown(err) => {
123+
error!("SHUTDOWN from: {}", err);
124+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
125+
task_collector
126+
.safe_lock(|s| {
127+
for handle in s {
128+
handle.abort();
129+
}
130+
})
131+
.unwrap();
132+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
133+
break;
134+
}
135+
status::State::UpstreamRogue => {
136+
error!("Changin Pool");
137+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
138+
task_collector
139+
.safe_lock(|s| {
140+
for handle in s {
141+
handle.abort();
142+
}
143+
})
144+
.unwrap();
145+
upstream_index += 1;
146+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
147+
break;
148+
}
149+
status::State::Healthy(msg) => {
150+
info!("HEALTHY message: {}", msg);
151+
}
152+
}
153+
}
154+
}
155+
}
156+
157+
async fn initialize_jd_as_solo_miner(
158+
&self,
159+
tx_status: async_channel::Sender<status::Status<'static>>,
160+
task_collector: Arc<Mutex<Vec<AbortHandle>>>,
161+
) {
162+
let proxy_config = &self.config;
163+
let timeout = proxy_config.timeout;
164+
let miner_tx_out = proxy_config::get_coinbase_output(proxy_config).unwrap();
165+
166+
// When Downstream receive a share that meets bitcoin target it transformit in a
167+
// SubmitSolution and send it to the TemplateReceiver
168+
let (send_solution, recv_solution) = bounded(10);
169+
170+
// Format `Downstream` connection address
171+
let downstream_addr = SocketAddr::new(
172+
IpAddr::from_str(&proxy_config.downstream_address).unwrap(),
173+
proxy_config.downstream_port,
174+
);
175+
176+
// Wait for downstream to connect
177+
let downstream = downstream::listen_for_downstream_mining(
178+
downstream_addr,
179+
None,
180+
send_solution,
181+
proxy_config.withhold,
182+
proxy_config.authority_public_key,
183+
proxy_config.authority_secret_key,
184+
proxy_config.cert_validity_sec,
185+
task_collector.clone(),
186+
status::Sender::Downstream(tx_status.clone()),
187+
miner_tx_out.clone(),
188+
None,
189+
)
190+
.await
191+
.unwrap();
192+
193+
// Initialize JD part
194+
let mut parts = proxy_config.tp_address.split(':');
195+
let ip_tp = parts.next().unwrap().to_string();
196+
let port_tp = parts.next().unwrap().parse::<u16>().unwrap();
197+
198+
TemplateRx::connect(
199+
SocketAddr::new(IpAddr::from_str(ip_tp.as_str()).unwrap(), port_tp),
200+
recv_solution,
201+
status::Sender::TemplateReceiver(tx_status.clone()),
202+
None,
203+
downstream,
204+
task_collector,
205+
Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))),
206+
miner_tx_out.clone(),
207+
proxy_config.tp_authority_public_key,
208+
false,
209+
)
210+
.await;
211+
}
212+
213+
async fn initialize_jd(
214+
&self,
215+
tx_status: async_channel::Sender<status::Status<'static>>,
216+
task_collector: Arc<Mutex<Vec<AbortHandle>>>,
217+
upstream_config: proxy_config::Upstream,
218+
) {
219+
let proxy_config = &self.config;
220+
let timeout = proxy_config.timeout;
221+
let test_only_do_not_send_solution_to_tp = proxy_config
222+
.test_only_do_not_send_solution_to_tp
223+
.unwrap_or(false);
224+
225+
// Format `Upstream` connection address
226+
let mut parts = upstream_config.pool_address.split(':');
227+
let address = parts
228+
.next()
229+
.unwrap_or_else(|| panic!("Invalid pool address {}", upstream_config.pool_address));
230+
let port = parts
231+
.next()
232+
.and_then(|p| p.parse::<u16>().ok())
233+
.unwrap_or_else(|| panic!("Invalid pool address {}", upstream_config.pool_address));
234+
let upstream_addr = SocketAddr::new(
235+
IpAddr::from_str(address).unwrap_or_else(|_| {
236+
panic!("Invalid pool address {}", upstream_config.pool_address)
237+
}),
238+
port,
239+
);
240+
241+
// When Downstream receive a share that meets bitcoin target it transformit in a
242+
// SubmitSolution and send it to the TemplateReceiver
243+
let (send_solution, recv_solution) = bounded(10);
244+
245+
// Instantiate a new `Upstream` (SV2 Pool)
246+
let upstream = match upstream_sv2::Upstream::new(
247+
upstream_addr,
248+
upstream_config.authority_pubkey,
249+
0, // TODO
250+
upstream_config.pool_signature.clone(),
251+
status::Sender::Upstream(tx_status.clone()),
252+
task_collector.clone(),
253+
Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))),
254+
)
255+
.await
256+
{
257+
Ok(upstream) => upstream,
258+
Err(e) => {
259+
error!("Failed to create upstream: {}", e);
260+
panic!()
261+
}
262+
};
263+
264+
// Start receiving messages from the SV2 Upstream role
265+
if let Err(e) = upstream_sv2::Upstream::parse_incoming(upstream.clone()) {
266+
error!("failed to create sv2 parser: {}", e);
267+
panic!()
268+
}
269+
270+
match upstream_sv2::Upstream::setup_connection(
271+
upstream.clone(),
272+
proxy_config.min_supported_version,
273+
proxy_config.max_supported_version,
274+
)
275+
.await
276+
{
277+
Ok(_) => info!("Connected to Upstream!"),
278+
Err(e) => {
279+
error!("Failed to connect to Upstream EXITING! : {}", e);
280+
panic!()
281+
}
282+
}
283+
284+
// Format `Downstream` connection address
285+
let downstream_addr = SocketAddr::new(
286+
IpAddr::from_str(&proxy_config.downstream_address).unwrap(),
287+
proxy_config.downstream_port,
288+
);
289+
290+
// Initialize JD part
291+
let mut parts = proxy_config.tp_address.split(':');
292+
let ip_tp = parts.next().unwrap().to_string();
293+
let port_tp = parts.next().unwrap().parse::<u16>().unwrap();
294+
295+
let mut parts = upstream_config.jd_address.split(':');
296+
let ip_jd = parts.next().unwrap().to_string();
297+
let port_jd = parts.next().unwrap().parse::<u16>().unwrap();
298+
let jd = match JobDeclarator::new(
299+
SocketAddr::new(IpAddr::from_str(ip_jd.as_str()).unwrap(), port_jd),
300+
upstream_config.authority_pubkey.into_bytes(),
301+
proxy_config.clone(),
302+
upstream.clone(),
303+
task_collector.clone(),
304+
)
305+
.await
306+
{
307+
Ok(c) => c,
308+
Err(e) => {
309+
let _ = tx_status
310+
.send(status::Status {
311+
state: status::State::UpstreamShutdown(e),
312+
})
313+
.await;
314+
return;
315+
}
316+
};
317+
318+
// Wait for downstream to connect
319+
let downstream = downstream::listen_for_downstream_mining(
320+
downstream_addr,
321+
Some(upstream),
322+
send_solution,
323+
proxy_config.withhold,
324+
proxy_config.authority_public_key,
325+
proxy_config.authority_secret_key,
326+
proxy_config.cert_validity_sec,
327+
task_collector.clone(),
328+
status::Sender::Downstream(tx_status.clone()),
329+
vec![],
330+
Some(jd.clone()),
331+
)
332+
.await
333+
.unwrap();
334+
335+
TemplateRx::connect(
336+
SocketAddr::new(IpAddr::from_str(ip_tp.as_str()).unwrap(), port_tp),
337+
recv_solution,
338+
status::Sender::TemplateReceiver(tx_status.clone()),
339+
Some(jd.clone()),
340+
downstream,
341+
task_collector,
342+
Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))),
343+
vec![],
344+
proxy_config.tp_authority_public_key,
345+
test_only_do_not_send_solution_to_tp,
346+
)
347+
.await;
348+
}
349+
}
350+
34351
#[derive(Debug)]
35352
pub struct PoolChangerTrigger {
36353
timeout: Duration,

0 commit comments

Comments
 (0)