Skip to content

Commit

Permalink
Add proxy protocol support, bump to 0.15.24
Browse files Browse the repository at this point in the history
  • Loading branch information
loriopatrick committed Sep 23, 2024
1 parent 9a46da9 commit ebbe973
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 24 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ members = [
]

[workspace.package]
version = "0.15.23"
version = "0.15.24"

[workspace.dependencies]
tokio = { version = "1.39", features = ["full"] }
Expand Down
14 changes: 10 additions & 4 deletions packages/agent_cli/src/autorun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};

use playit_agent_core::{
network::address_lookup::{AddressLookup, AddressValue},
network::address_lookup::{AddressLookup, AddressValue, HostOrigin},
playit_agent::PlayitAgent,
utils::now_milli,
};
Expand Down Expand Up @@ -195,9 +195,9 @@ pub struct LocalLookup {
}

impl AddressLookup for LocalLookup {
type Value = SocketAddr;
type Value = HostOrigin;

fn lookup(&self, ip: IpAddr, port: u16, proto: PortType) -> Option<AddressValue<SocketAddr>> {
fn lookup(&self, ip: IpAddr, port: u16, proto: PortType) -> Option<AddressValue<HostOrigin>> {
let values = self.data.lock().unwrap();

for tunnel in &*values {
Expand All @@ -211,7 +211,11 @@ impl AddressLookup for LocalLookup {

if tunnel.from_port <= port && port < tunnel.to_port {
return Some(AddressValue {
value: tunnel.local_start_address,
value: HostOrigin {
host_addr: tunnel.local_start_address,
use_special_lan: None,
proxy_protocol: tunnel.proxy_protocol,
},
from_port: tunnel.from_port,
to_port: tunnel.to_port,
});
Expand All @@ -238,6 +242,7 @@ impl LocalLookup {
from_port: tunnel.port.from,
to_port: tunnel.port.to,
local_start_address: SocketAddr::new(tunnel.local_ip, tunnel.local_port),
proxy_protocol: tunnel.proxy_protocol,
});
}

Expand All @@ -253,4 +258,5 @@ pub struct TunnelEntry {
pub from_port: u16,
pub to_port: u16,
pub local_start_address: SocketAddr,
pub proxy_protocol: Option<ProxyProtocol>,
}
1 change: 1 addition & 0 deletions packages/agent_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ async fn main() -> Result<std::process::ExitCode, CliError> {
version: AgentVersion {
platform,
version: env!("CARGO_PKG_VERSION").to_string(),
has_expired: false,
},
official: true,
details_website: None,
Expand Down
2 changes: 1 addition & 1 deletion packages/agent_core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "playit-agent-core"
version = "0.17.6"
version = "0.17.7"
edition = "2021"
description = "Contains the logic to create a playit.gg agent"
license = "BSD-2-Clause"
Expand Down
1 change: 1 addition & 0 deletions packages/agent_core/src/agent_control/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub fn get_version() -> PlayitAgentVersion {
version: AgentVersion {
platform: get_platform(),
version: env!("CARGO_PKG_VERSION").to_string(),
has_expired: false,
},
official: true,
details_website: None,
Expand Down
33 changes: 31 additions & 2 deletions packages/agent_core/src/network/address_lookup.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::net::IpAddr;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;

use playit_api_client::api::PortType;
use playit_api_client::api::{PortType, ProxyProtocol};

#[derive(Debug)]
pub struct AddressValue<V> {
Expand All @@ -23,3 +23,32 @@ impl<T: AddressLookup> AddressLookup for Arc<T> {
T::lookup(&*self, ip, port, proto)
}
}

#[derive(Clone, Debug)]
pub struct HostOrigin {
pub host_addr: SocketAddr,
pub use_special_lan: Option<bool>,
pub proxy_protocol: Option<ProxyProtocol>,
}

impl std::fmt::Display for HostOrigin {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "HostOrigin({}, special: {:?}, proxy: {:?})", self.host_addr, self.use_special_lan, self.proxy_protocol)
}
}

impl Into<SocketAddr> for HostOrigin {
fn into(self) -> SocketAddr {
self.host_addr
}
}

impl From<SocketAddr> for HostOrigin {
fn from(value: SocketAddr) -> Self {
HostOrigin {
host_addr: value,
use_special_lan: None,
proxy_protocol: None,
}
}
}
3 changes: 2 additions & 1 deletion packages/agent_core/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ pub mod tcp_clients;
pub mod address_lookup;
pub mod lan_address;
pub mod tcp_pipe;
pub mod tcp_tunnel;
pub mod tcp_tunnel;
pub mod proxy_protocol;
66 changes: 66 additions & 0 deletions packages/agent_core/src/network/proxy_protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::net::{Ipv4Addr, Ipv6Addr};

use tokio::io::{AsyncWrite, AsyncWriteExt};

pub enum ProxyProtocolHeader {
Tcp4 {
client_ip: Ipv4Addr,
proxy_ip: Ipv4Addr,
client_port: u16,
proxy_port: u16,
},
Tcp6 {
client_ip: Ipv6Addr,
proxy_ip: Ipv6Addr,
client_port: u16,
proxy_port: u16,
},
}

impl std::fmt::Display for ProxyProtocolHeader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Tcp4 { client_ip, proxy_ip, client_port, proxy_port } => {
write!(f, "PROXY TCP4 {client_ip} {proxy_ip} {client_port} {proxy_port}\r\n")
}
Self::Tcp6 { client_ip, proxy_ip, client_port, proxy_port } => {
write!(f, "PROXY TCP6 {client_ip} {proxy_ip} {client_port} {proxy_port}\r\n")
}
}
}
}

const PROXY_PROTOCOL_V2_HEADER: &'static [u8] = &[
0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A,
/* version 2 + proxy connection byte */ 0x21
];

impl ProxyProtocolHeader {
pub async fn write_v1<W: AsyncWrite + Unpin>(&self, out: &mut W) -> Result<(), std::io::Error> {
out.write_all(self.to_string().as_bytes()).await
}

pub async fn write_v2<W: AsyncWrite + Unpin>(&self, out: &mut W) -> Result<(), std::io::Error> {
out.write_all(PROXY_PROTOCOL_V2_HEADER).await?;

match self {
Self::Tcp4 { client_ip, proxy_ip, client_port, proxy_port } => {
out.write_all(&[ /* TCP4: AF_INET + STREAM */ 0x11 ]).await?;
out.write_all(/* length */ &12u16.to_be_bytes()).await?;
out.write_all(&client_ip.octets()).await?;
out.write_all(&proxy_ip.octets()).await?;
out.write_all(&client_port.to_be_bytes()).await?;
out.write_all(&proxy_port.to_be_bytes()).await?;
}
Self::Tcp6 { client_ip, proxy_ip, client_port, proxy_port } => {
out.write_all(&[ /* TCP6: AF_INET6 + STREAM */ 0x21 ]).await?;
out.write_all(/* length */ &36u16.to_be_bytes()).await?;
out.write_all(&client_ip.octets()).await?;
out.write_all(&proxy_ip.octets()).await?;
out.write_all(&client_port.to_be_bytes()).await?;
out.write_all(&proxy_port.to_be_bytes()).await?;
}
}
Ok(())
}
}
58 changes: 48 additions & 10 deletions packages/agent_core/src/playit_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use std::time::Duration;
use tracing::Instrument;

use crate::agent_control::{AuthApi, DualStackUdpSocket};
use playit_api_client::api::PortType;
use crate::network::address_lookup::AddressLookup;
use crate::network::proxy_protocol::ProxyProtocolHeader;
use playit_api_client::api::{PortType, ProxyProtocol};
use crate::network::address_lookup::{AddressLookup, HostOrigin};
use crate::network::lan_address::LanAddress;
use crate::network::tcp_clients::TcpClients;
use crate::network::tcp_pipe::pipe;
Expand All @@ -25,7 +26,7 @@ pub struct PlayitAgent<L: AddressLookup> {
keep_running: Arc<AtomicBool>,
}

impl<L: AddressLookup + Sync + Send> PlayitAgent<L> where L::Value: Into<SocketAddr> {
impl<L: AddressLookup + Sync + Send> PlayitAgent<L> where L::Value: Into<HostOrigin> + Into<SocketAddr> {
pub async fn new(api_url: String, secret_key: String, lookup: Arc<L>) -> Result<Self, SetupError> {
let io = DualStackUdpSocket::new().await?;
let auth = AuthApi {
Expand Down Expand Up @@ -84,15 +85,16 @@ impl<L: AddressLookup + Sync + Send> PlayitAgent<L> where L::Value: Into<SocketA

let clients = self.tcp_clients.clone();

let host_addr = match self.lookup.lookup(
let host_origin = match self.lookup.lookup(
new_client.connect_addr.ip(),
new_client.connect_addr.port(),
PortType::Tcp
) {
Some(found) => {
let addr = found.value.into();
let mut origin: HostOrigin = found.value.into();
let port_offset = new_client.connect_addr.port() - found.from_port;
SocketAddr::new(addr.ip(), port_offset + addr.port())
origin.host_addr = SocketAddr::new(origin.host_addr.ip(), port_offset + origin.host_addr.port());
origin
},
None => {
tracing::info!(
Expand All @@ -109,7 +111,7 @@ impl<L: AddressLookup + Sync + Send> PlayitAgent<L> where L::Value: Into<SocketA
"tcp_tunnel",
peer_addr = %new_client.peer_addr,
tunn_addr = %new_client.connect_addr,
%host_addr,
%host_origin,
sid = new_client.tunnel_server_id,
did = new_client.data_center_id,
);
Expand All @@ -131,7 +133,7 @@ impl<L: AddressLookup + Sync + Send> PlayitAgent<L> where L::Value: Into<SocketA

tracing::info!("connected to TCP tunnel");

let local_conn = match LanAddress::tcp_socket(self.tcp_clients.use_special_lan, peer_addr, host_addr).await {
let local_conn = match LanAddress::tcp_socket(self.tcp_clients.use_special_lan, peer_addr, host_origin.host_addr).await {
Ok(v) => v,
Err(error) => {
tracing::error!(?error, "failed to connect to local server");
Expand All @@ -144,12 +146,48 @@ impl<L: AddressLookup + Sync + Send> PlayitAgent<L> where L::Value: Into<SocketA
}

let (tunnel_read, tunnel_write) = tunnel_conn.into_split();
let (local_read, local_write) = local_conn.into_split();
let (local_read, mut local_write) = local_conn.into_split();

let tunn_to_local_span = tracing::info_span!("tunn2local");
let local_to_tunn_span = tracing::info_span!("local2tunn");

tokio::spawn(pipe(tunnel_read, local_write).instrument(tunn_to_local_span));
tokio::spawn(async move {
'write_proxy_header: {
let Some(protocol) = host_origin.proxy_protocol else { break 'write_proxy_header };

let header = match (new_client.peer_addr, new_client.connect_addr) {
(SocketAddr::V4(client_addr), SocketAddr::V4(proxy_addr)) => ProxyProtocolHeader::Tcp4 {
client_ip: *client_addr.ip(),
proxy_ip: *proxy_addr.ip(),
client_port: client_addr.port(),
proxy_port: proxy_addr.port(),
},
(SocketAddr::V6(client_addr), SocketAddr::V6(proxy_addr)) => ProxyProtocolHeader::Tcp6 {
client_ip: *client_addr.ip(),
proxy_ip: *proxy_addr.ip(),
client_port: client_addr.port(),
proxy_port: proxy_addr.port(),
},
_ => {
tracing::warn!("peer and connect address have different protocol version");
break 'write_proxy_header;
}
};

let result = match protocol {
ProxyProtocol::ProxyProtocolV1 => header.write_v1(&mut local_write).await,
ProxyProtocol::ProxyProtocolV2 => header.write_v2(&mut local_write).await,
};

if let Err(error) = result {
tracing::error!(?error, "failed to write proxy protocol header to location connection");
return Err(error);
}
}

pipe(tunnel_read, local_write).await
}.instrument(tunn_to_local_span));

tokio::spawn(pipe(local_read, tunnel_write).instrument(local_to_tunn_span));
}.instrument(span));
}
Expand Down
2 changes: 1 addition & 1 deletion packages/api_client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "playit-api-client"
version = "0.1.0"
version = "0.1.1"
edition = "2021"
description = "Contains the logic to create a playit.gg agent"
license = "BSD-2-Clause"
Expand Down
10 changes: 10 additions & 0 deletions packages/api_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ pub struct PlayitAgentVersion {
pub struct AgentVersion {
pub platform: Platform,
pub version: String,
pub has_expired: bool,
}

#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq, Copy, Clone, Hash)]
Expand Down Expand Up @@ -654,6 +655,7 @@ pub struct AgentTunnel {
pub assigned_domain: String,
pub custom_domain: Option<String>,
pub disabled: Option<AgentTunnelDisabled>,
pub proxy_protocol: Option<ProxyProtocol>,
}

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
Expand All @@ -668,6 +670,14 @@ pub enum AgentTunnelDisabled {
BySystem,
}

#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq, Copy, Clone, Hash)]
pub enum ProxyProtocol {
#[serde(rename = "proxy-protocol-v1")]
ProxyProtocolV1,
#[serde(rename = "proxy-protocol-v2")]
ProxyProtocolV2,
}

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct AgentPendingTunnel {
pub id: uuid::Uuid,
Expand Down
2 changes: 1 addition & 1 deletion packages/ping_monitor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{collections::{HashMap, HashSet}, sync::{atomic::{AtomicBool, Ordering}, Arc}, time::Duration};

use ping_tool::PlayitPingTool;
use playit_api_client::{api::{ApiErrorNoFail, ApiResponseError, PingExperimentDetails, PingExperimentResult, PingSample, PingTarget, ReqPingSubmit}, http_client::{HttpClient, HttpClientError}, PlayitApi};
use playit_api_client::{api::{ApiErrorNoFail, ApiResponseError, PingExperimentDetails, PingExperimentResult, PingSample, PingTarget, ReqPingSubmit}, http_client::HttpClientError, PlayitApi};
use tokio::sync::Mutex;

pub mod ping_tool;
Expand Down

0 comments on commit ebbe973

Please sign in to comment.