Skip to content

Commit

Permalink
TProxy restart if gests disconnected by upstream
Browse files Browse the repository at this point in the history
When gets disconnected from upstream, the TProxy kills all spawned tasks
and reconnects.
Add start function and put starting logic there
Call this function when receive an UpstreamShutdown, after waiting a
random amount of time (if there are more TProxy for the same upstream
and all of them connect at the same time, it is bad)
Add a cancellation token, which clones are propagated to all
downstreams. If one of this clones gets cancelled, all of them get
cancelled.
After each spwaned task,  there is a second task that checks if the
token get canceled. In this case, it kills all the task above.
use tokio::task in favor of async_std::task
  • Loading branch information
lorbax committed Jun 25, 2024
1 parent d6c42d8 commit c8d8472
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 72 deletions.
2 changes: 1 addition & 1 deletion protocols/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion roles/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion roles/translator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ error_handling = { version = "1.0.0", path = "../../utils/error-handling" }
key-utils = { version = "^1.0.0", path = "../../utils/key-utils" }
tokio-util = { version = "0.7.10", features = ["codec"] }
async-compat = "0.2.1"
rand = "0.8.4"



[dev-dependencies]
rand = "0.8.4"
sha2 = "0.10.6"

[features]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ min_extranonce2_size = 8
# Difficulty params
[downstream_difficulty_config]
# hashes/s of the weakest miner that will be connecting (e.g.: 10 Th/s = 10_000_000_000_000.0)
min_individual_miner_hashrate=10_000_000.0
min_individual_miner_hashrate=10_000_000_000_000.0
# target number of shares per minute the miner should be sending
shares_per_minute = 6.0

[upstream_difficulty_config]
# interval in seconds to elapse before updating channel hashrate with the pool
channel_diff_update_interval = 60
# estimated accumulated hashrate of all downstream miners (e.g.: 10 Th/s = 10_000_000_000_000.0)
channel_nominal_hashrate = 10_000_000.0
channel_nominal_hashrate = 10_000_000_000_000.0
41 changes: 37 additions & 4 deletions roles/translator/src/lib/downstream_sv1/downstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use futures::select;
use tokio_util::codec::{FramedRead, LinesCodec};

use std::{net::SocketAddr, sync::Arc};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use v1::{
client_to_server::{self, Submit},
Expand Down Expand Up @@ -110,6 +111,7 @@ impl Downstream {
host: String,
difficulty_config: DownstreamDifficultyConfig,
upstream_difficulty_config: Arc<Mutex<UpstreamDifficultyConfig>>,
cancellation_token: CancellationToken,
) {
let stream = std::sync::Arc::new(stream);

Expand Down Expand Up @@ -150,11 +152,12 @@ impl Downstream {
let rx_shutdown_clone = rx_shutdown.clone();
let tx_shutdown_clone = tx_shutdown.clone();
let tx_status_reader = tx_status.clone();
let cancellation_token_mining_device = cancellation_token.clone();
// Task to read from SV1 Mining Device Client socket via `socket_reader`. Depending on the
// SV1 message received, a message response is sent directly back to the SV1 Downstream
// role, or the message is sent upwards to the Bridge for translation into a SV2 message
// and then sent to the SV2 Upstream role.
let _socket_reader_task = task::spawn(async move {
let socket_reader_task = tokio::task::spawn(async move {
let reader = BufReader::new(&*socket_reader);
let mut messages = FramedRead::new(
async_compat::Compat::new(reader),
Expand Down Expand Up @@ -205,15 +208,21 @@ impl Downstream {
kill(&tx_shutdown_clone).await;
warn!("Downstream: Shutting down sv1 downstream reader");
});
tokio::task::spawn(async move {
cancellation_token_mining_device.cancelled().await;
socket_reader_task.abort();
warn!("Shutting down sv1 downstream reader");
});

let rx_shutdown_clone = rx_shutdown.clone();
let tx_shutdown_clone = tx_shutdown.clone();
let tx_status_writer = tx_status.clone();
let host_ = host.clone();

let cancellation_token_new_sv1_message_no_transl = cancellation_token.clone();
// Task to receive SV1 message responses to SV1 messages that do NOT need translation.
// These response messages are sent directly to the SV1 Downstream role.
let _socket_writer_task = task::spawn(async move {
let socket_writer_task = tokio::task::spawn(async move {
loop {
select! {
res = receiver_outgoing.recv().fuse() => {
Expand Down Expand Up @@ -242,11 +251,20 @@ impl Downstream {
&host_
);
});
tokio::task::spawn(async move {
tokio::select! {
_ = cancellation_token_new_sv1_message_no_transl.cancelled() => {
socket_writer_task.abort();
warn!("Shutting down sv1 downstream writer");
},
}
});

let tx_status_notify = tx_status;
let self_ = downstream.clone();

let _notify_task = task::spawn(async move {
let cancellation_token_notify_task = cancellation_token.clone();
let notify_task = tokio::task::spawn(async move {
let timeout_timer = std::time::Instant::now();
let mut first_sent = false;
loop {
Expand Down Expand Up @@ -329,10 +347,16 @@ impl Downstream {
&host
);
});
tokio::task::spawn(async move {
cancellation_token_notify_task.cancelled().await;
notify_task.abort();
warn!("Shutting down sv1 downstream job notifier");
});
}

/// Accept connections from one or more SV1 Downstream roles (SV1 Mining Devices) and create a
/// new `Downstream` for each connection.
#[allow(clippy::too_many_arguments)]
pub fn accept_connections(
downstream_addr: SocketAddr,
tx_sv1_submit: Sender<DownstreamMessages>,
Expand All @@ -341,8 +365,11 @@ impl Downstream {
bridge: Arc<Mutex<crate::proxy::Bridge>>,
downstream_difficulty_config: DownstreamDifficultyConfig,
upstream_difficulty_config: Arc<Mutex<UpstreamDifficultyConfig>>,
cancellation_token: CancellationToken,
) {
task::spawn(async move {
let cancellation_token_downstream = cancellation_token.clone();

let task = tokio::task::spawn(async move {
let downstream_listener = TcpListener::bind(downstream_addr).await.unwrap();
let mut downstream_incoming = downstream_listener.incoming();

Expand All @@ -369,6 +396,7 @@ impl Downstream {
host,
downstream_difficulty_config.clone(),
upstream_difficulty_config.clone(),
cancellation_token_downstream.clone(),
)
.await;
}
Expand All @@ -378,6 +406,11 @@ impl Downstream {
}
}
});
tokio::task::spawn(async move {
cancellation_token.cancelled().await;
task.abort();
warn!("Shutting down accept connections task");
});
}

/// As SV1 messages come in, determines if the message response needs to be translated to SV2
Expand Down
34 changes: 30 additions & 4 deletions roles/translator/src/lib/proxy/bridge.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use async_channel::{Receiver, Sender};
use async_std::task;
use roles_logic_sv2::{
channel_logic::channel_factory::{ExtendedChannelKind, ProxyExtendedChannelFactory, Share},
mining_sv2::{
Expand All @@ -22,6 +21,7 @@ use super::super::{
};
use error_handling::handle_result;
use roles_logic_sv2::{channel_logic::channel_factory::OnNewShare, Error as RolesLogicError};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};

/// Bridge between the SV2 `Upstream` and SV1 `Downstream` responsible for the following messaging
Expand Down Expand Up @@ -64,6 +64,7 @@ pub struct Bridge {
last_p_hash: Option<SetNewPrevHash<'static>>,
target: Arc<Mutex<Vec<u8>>>,
last_job_id: u32,
cancellation_token: CancellationToken,
}

impl Bridge {
Expand All @@ -79,6 +80,7 @@ impl Bridge {
extranonces: ExtendedExtranonce,
target: Arc<Mutex<Vec<u8>>>,
up_id: u32,
cancellation_token: CancellationToken,
) -> Arc<Mutex<Self>> {
let ids = Arc::new(Mutex::new(GroupId::new()));
let share_per_min = 1.0;
Expand Down Expand Up @@ -107,6 +109,7 @@ impl Bridge {
last_p_hash: None,
target,
last_job_id: 0,
cancellation_token,
}))
}

Expand Down Expand Up @@ -162,10 +165,12 @@ impl Bridge {
/// Receives a `DownstreamMessages` message from the `Downstream`, handles based on the
/// variant received.
fn handle_downstream_messages(self_: Arc<Mutex<Self>>) {
let cancellation_token_handle_downstream =
self_.safe_lock(|b| b.cancellation_token.clone()).unwrap();
let (rx_sv1_downstream, tx_status) = self_
.safe_lock(|s| (s.rx_sv1_downstream.clone(), s.tx_status.clone()))
.unwrap();
task::spawn(async move {
let task = tokio::task::spawn(async move {
loop {
let msg = handle_result!(tx_status, rx_sv1_downstream.clone().recv().await);

Expand All @@ -185,6 +190,11 @@ impl Bridge {
};
}
});
tokio::task::spawn(async move {
cancellation_token_handle_downstream.cancelled().await;
task.abort();
warn!("Shutting down handle_result task");
});
}
/// receives a `SetDownstreamTarget` and updates the downstream target for the channel
#[allow(clippy::result_large_err)]
Expand Down Expand Up @@ -367,6 +377,8 @@ impl Bridge {
/// corresponding `job_id` has already been received. If this is not the case, an error has
/// occurred on the Upstream pool role and the connection will close.
fn handle_new_prev_hash(self_: Arc<Mutex<Self>>) {
let cancellation_token_handle_prev_hash =
self_.safe_lock(|b| b.cancellation_token.clone()).unwrap();
let (tx_sv1_notify, rx_sv2_set_new_prev_hash, tx_status) = self_
.safe_lock(|s| {
(
Expand All @@ -377,7 +389,7 @@ impl Bridge {
})
.unwrap();
debug!("Starting handle_new_prev_hash task");
task::spawn(async move {
let task = tokio::task::spawn(async move {
loop {
// Receive `SetNewPrevHash` from `Upstream`
let sv2_set_new_prev_hash: SetNewPrevHash =
Expand All @@ -397,6 +409,11 @@ impl Bridge {
)
}
});
tokio::task::spawn(async move {
cancellation_token_handle_prev_hash.cancelled().await;
task.abort();
warn!("Shutting down handle_new_prev_hash");
});
}

async fn handle_new_extended_mining_job_(
Expand Down Expand Up @@ -460,6 +477,8 @@ impl Bridge {
/// `SetNewPrevHash` `job_id`, an error has occurred on the Upstream pool role and the
/// connection will close.
fn handle_new_extended_mining_job(self_: Arc<Mutex<Self>>) {
let cancellation_token_new_extended_mining_job =
self_.safe_lock(|b| b.cancellation_token.clone()).unwrap();
let (tx_sv1_notify, rx_sv2_new_ext_mining_job, tx_status) = self_
.safe_lock(|s| {
(
Expand All @@ -470,7 +489,7 @@ impl Bridge {
})
.unwrap();
debug!("Starting handle_new_extended_mining_job task");
task::spawn(async move {
let task = tokio::task::spawn(async move {
loop {
// Receive `NewExtendedMiningJob` from `Upstream`
let sv2_new_extended_mining_job: NewExtendedMiningJob = handle_result!(
Expand All @@ -494,6 +513,11 @@ impl Bridge {
.store(true, std::sync::atomic::Ordering::SeqCst);
}
});
tokio::task::spawn(async move {
cancellation_token_new_extended_mining_job.cancelled().await;
task.abort();
warn!("Task handle_new_extended_mining_job cancelled");
});
}
}
pub struct OpenSv1Downstream {
Expand Down Expand Up @@ -543,6 +567,7 @@ mod test {
rx_sv1_notify,
};

let cancellation_token = CancellationToken::new();
let b = Bridge::new(
rx_sv1_submit,
tx_sv2_submit_shares_ext,
Expand All @@ -553,6 +578,7 @@ mod test {
extranonces,
Arc::new(Mutex::new(upstream_target)),
1,
cancellation_token,
);
(b, interface)
}
Expand Down
Loading

0 comments on commit c8d8472

Please sign in to comment.