Skip to content

Commit

Permalink
feat: add config for remote-to-local unbind timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
erebe committed Feb 1, 2025
1 parent 28554a4 commit ddac606
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 12 deletions.
17 changes: 15 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,11 @@ pub struct Server {
)
)]
pub http_proxy_password: Option<String>,

/// Configure how much time a remote-to-local server is going to wait idle (without any new ws clients) before unbinding itself/stopping the server
/// Default is 190 seconds/3min
#[cfg_attr(feature = "clap", arg(long, value_name = "seconds", default_value = "3m", value_parser = parsers::parse_duration_sec, verbatim_doc_comment,))]
pub remote_to_local_server_idle_timeout_sec: Duration,
}

#[derive(Clone, Debug, PartialEq)]
Expand All @@ -355,6 +360,7 @@ mod parsers {
use crate::tunnel::LocalProtocol;
use base64::Engine;
use hyper::http::{HeaderName, HeaderValue};
use std::cmp::max;
use std::collections::BTreeMap;
use std::io;
use std::io::ErrorKind;
Expand All @@ -368,14 +374,21 @@ mod parsers {
pub fn parse_duration_sec(arg: &str) -> Result<Duration, io::Error> {
use std::io::Error;

let (arg, multiplier) = match &arg[max(0, arg.len() - 1)..] {
"s" => (&arg[..arg.len() - 1], 1),
"m" => (&arg[..arg.len() - 1], 60),
"h" => (&arg[..arg.len() - 1], 3600),
_ => (arg, 1),
};

let Ok(secs) = arg.parse::<u64>() else {
return Err(Error::new(
ErrorKind::InvalidInput,
format!("cannot duration of seconds from {}", arg),
format!("cannot parse duration of seconds from {}", arg),
));
};

Ok(Duration::from_secs(secs))
Ok(Duration::from_secs(secs * multiplier))
}

pub fn parse_local_bind(arg: &str) -> Result<(SocketAddr, &str), io::Error> {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ pub async fn run_server(args: Server) -> anyhow::Result<()> {
.expect("Cannot create DNS resolver"),
restriction_config: args.restrict_config,
http_proxy,
remote_server_idle_timeout: args.remote_to_local_server_idle_timeout_sec,
};
let server = WsServer::new(server_config);

Expand Down
1 change: 1 addition & 0 deletions src/test_integrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ fn server_no_tls(dns_resolver: DnsResolver) -> WsServer {
dns_resolver,
restriction_config: None,
http_proxy: None,
remote_server_idle_timeout: Duration::from_secs(30),
};
WsServer::new(server_config)
}
Expand Down
10 changes: 5 additions & 5 deletions src/tunnel/server/reverse_tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl<T: TunnelListener> ReverseTunnelServer<T> {
pub async fn run_listening_server(
&self,
bind_addr: SocketAddr,
idle_timeout: Duration,
gen_listening_server: impl Future<Output = anyhow::Result<T>>,
) -> anyhow::Result<((<T as TunnelListener>::Reader, <T as TunnelListener>::Writer), RemoteAddr)>
where
Expand All @@ -52,7 +53,6 @@ impl<T: TunnelListener> ReverseTunnelServer<T> {
listening_server
} else {
let listening_server = gen_listening_server.await?;
let send_timeout = Duration::from_secs(60 * 3);
let (tx, rx) = async_channel::bounded(10);
let nb_seen_clients = Arc::new(AtomicUsize::new(0));
let seen_clients = nb_seen_clients.clone();
Expand All @@ -64,7 +64,7 @@ impl<T: TunnelListener> ReverseTunnelServer<T> {
server.lock().remove(&local_srv2);
});

let mut timer = time::interval(send_timeout);
let mut timer = time::interval(idle_timeout);
pin_mut!(listening_server);
loop {
select! {
Expand All @@ -77,8 +77,8 @@ impl<T: TunnelListener> ReverseTunnelServer<T> {
continue;
}
Some(Ok(cnx)) => {
if time::timeout(send_timeout, tx.send(cnx)).await.is_err() {
info!("New reverse connection failed to be picked by client after {}s. Closing reverse tunnel server", send_timeout.as_secs());
if time::timeout(idle_timeout, tx.send(cnx)).await.is_err() {
info!("New reverse connection failed to be picked by client after {}s. Closing reverse tunnel server", idle_timeout.as_secs());
break;
}
}
Expand All @@ -89,7 +89,7 @@ impl<T: TunnelListener> ReverseTunnelServer<T> {
// if no client connected to the reverse tunnel server, close it
// <= 1 because the server itself has a receiver
if seen_clients.swap(0, Ordering::Relaxed) == 0 && tx.receiver_count() <= 1 {
info!("No client connected to reverse tunnel server for {}s. Closing reverse tunnel server", send_timeout.as_secs());
info!("No client connected to reverse tunnel server for {}s. Closing reverse tunnel server", idle_timeout.as_secs());
break;
}
},
Expand Down
22 changes: 17 additions & 5 deletions src/tunnel/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct WsServerConfig {
pub dns_resolver: DnsResolver,
pub restriction_config: Option<PathBuf>,
pub http_proxy: Option<Url>,
pub remote_server_idle_timeout: Duration,
}

#[derive(Clone)]
Expand Down Expand Up @@ -196,7 +197,9 @@ impl WsServer {
let local_srv = (remote.host, remote_port);
let bind = try_to_sock_addr(local_srv.clone())?;
let listening_server = async { TcpTunnelListener::new(bind, local_srv.clone(), false).await };
let ((local_rx, local_tx), remote) = SERVERS.run_listening_server(bind, listening_server).await?;
let ((local_rx, local_tx), remote) = SERVERS
.run_listening_server(bind, self.config.remote_server_idle_timeout, listening_server)
.await?;

Ok((remote, Box::pin(local_rx), Box::pin(local_tx)))
}
Expand All @@ -208,7 +211,9 @@ impl WsServer {
let local_srv = (remote.host, remote_port);
let bind = try_to_sock_addr(local_srv.clone())?;
let listening_server = async { UdpTunnelListener::new(bind, local_srv.clone(), timeout).await };
let ((local_rx, local_tx), remote) = SERVERS.run_listening_server(bind, listening_server).await?;
let ((local_rx, local_tx), remote) = SERVERS
.run_listening_server(bind, self.config.remote_server_idle_timeout, listening_server)
.await?;
Ok((remote, Box::pin(local_rx), Box::pin(local_tx)))
}
LocalProtocol::ReverseSocks5 { timeout, credentials } => {
Expand All @@ -219,7 +224,9 @@ impl WsServer {
let local_srv = (remote.host, remote_port);
let bind = try_to_sock_addr(local_srv.clone())?;
let listening_server = async { Socks5TunnelListener::new(bind, timeout, credentials).await };
let ((local_rx, local_tx), remote) = SERVERS.run_listening_server(bind, listening_server).await?;
let ((local_rx, local_tx), remote) = SERVERS
.run_listening_server(bind, self.config.remote_server_idle_timeout, listening_server)
.await?;

Ok((remote, Box::pin(local_rx), Box::pin(local_tx)))
}
Expand All @@ -231,7 +238,9 @@ impl WsServer {
let local_srv = (remote.host, remote_port);
let bind = try_to_sock_addr(local_srv.clone())?;
let listening_server = async { HttpProxyTunnelListener::new(bind, timeout, credentials, false).await };
let ((local_rx, local_tx), remote) = SERVERS.run_listening_server(bind, listening_server).await?;
let ((local_rx, local_tx), remote) = SERVERS
.run_listening_server(bind, self.config.remote_server_idle_timeout, listening_server)
.await?;

Ok((remote, Box::pin(local_rx), Box::pin(local_tx)))
}
Expand All @@ -245,7 +254,9 @@ impl WsServer {
let local_srv = (remote.host, remote_port);
let bind = try_to_sock_addr(local_srv.clone())?;
let listening_server = async { UnixTunnelListener::new(path, local_srv, false).await };
let ((local_rx, local_tx), remote) = SERVERS.run_listening_server(bind, listening_server).await?;
let ((local_rx, local_tx), remote) = SERVERS
.run_listening_server(bind, self.config.remote_server_idle_timeout, listening_server)
.await?;

Ok((remote, Box::pin(local_rx), Box::pin(local_tx)))
}
Expand Down Expand Up @@ -481,6 +492,7 @@ impl Debug for WsServerConfig {
.field("websocket_mask_frame", &self.websocket_mask_frame)
.field("restriction_config", &self.restriction_config)
.field("tls", &self.tls.is_some())
.field("remote_server_idle_timeout", &self.remote_server_idle_timeout)
.field(
"mTLS",
&self
Expand Down

0 comments on commit ddac606

Please sign in to comment.