Skip to content

Commit

Permalink
feat: add a lot of trace level telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
eareimu authored and huster-zhangpeng committed Jan 23, 2025
1 parent 5036828 commit c3f615b
Show file tree
Hide file tree
Showing 31 changed files with 431 additions and 304 deletions.
3 changes: 2 additions & 1 deletion gm-quic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ bytes = { workspace = true }
dashmap = { workspace = true }
deref-derive = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
qbase = { workspace = true }
qcongestion = { workspace = true }
qconnection = { workspace = true }
Expand All @@ -26,12 +25,14 @@ qunreliable = { workspace = true }
rustls = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
clap = { workspace = true }
env_logger = { workspace = true }
rustls = { workspace = true, features = ["ring"] }
url = { workspace = true }
log = { workspace = true }
tokio = { features = ["fs"], workspace = true }

[[example]]
Expand Down
2 changes: 1 addition & 1 deletion gm-quic/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn run(args: Arguments) -> Result<(), Box<dyn std::error::Error>> {
}

if content == "exit" || content == "quit" {
quic_conn.close("Client close the connection", 0);
quic_conn.close("Client close the connection".into(), 0);
break;
}

Expand Down
79 changes: 42 additions & 37 deletions gm-quic/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use rustls::{
ClientConfig as TlsClientConfig, ConfigBuilder, WantsVerifier,
};
use tokio::sync::mpsc;
use tracing::{debug_span, Instrument};

use crate::{
interfaces::Interfaces,
Expand Down Expand Up @@ -188,36 +189,39 @@ impl QuicClient {
.run_with(event_broker),
);

tokio::spawn({
let connection = connection.clone();
async move {
while let Some(event) = events.recv().await {
match event {
Event::Handshaked => {}
Event::ProbedNewPath(_, _) => {}
Event::PathInactivated(_pathway, socket) => {
_ = Interfaces::try_free_interface(socket.src())
}
Event::Failed(error) => {
REUSEABLE_CONNECTIONS
.remove_if(&(server_name.clone(), server_addr), |_, exist| {
Arc::ptr_eq(&connection, exist)
});
connection.enter_closing(error.into())
tokio::spawn(
{
let connection = connection.clone();
async move {
while let Some(event) = events.recv().await {
match event {
Event::Handshaked => {}
Event::ProbedNewPath(_, _) => {}
Event::PathInactivated(_pathway, socket) => {
_ = Interfaces::try_free_interface(socket.src())
}
Event::Failed(error) => {
REUSEABLE_CONNECTIONS
.remove_if(&(server_name.clone(), server_addr), |_, exist| {
Arc::ptr_eq(&connection, exist)
});
connection.enter_closing(error.into())
}
Event::Closed(ccf) => {
REUSEABLE_CONNECTIONS
.remove_if(&(server_name.clone(), server_addr), |_, exist| {
Arc::ptr_eq(&connection, exist)
});
connection.enter_draining(ccf)
}
Event::StatelessReset => {}
Event::Terminated => {}
}
Event::Closed(ccf) => {
REUSEABLE_CONNECTIONS
.remove_if(&(server_name.clone(), server_addr), |_, exist| {
Arc::ptr_eq(&connection, exist)
});
connection.enter_draining(ccf)
}
Event::StatelessReset => {}
Event::Terminated => {}
}
}
}
});
.instrument(debug_span!("client_connection_driver")),
);

connection.add_path(socket, pathway)?;
Ok(connection)
Expand Down Expand Up @@ -259,15 +263,16 @@ impl QuicClient {
server_addr: SocketAddr,
) -> io::Result<Arc<Connection>> {
let server_name = server_name.into();

if self.reuse_connection {
REUSEABLE_CONNECTIONS
.entry((server_name.clone(), server_addr))
.or_try_insert_with(|| self.new_connection(server_name, server_addr))
.map(|entry| entry.clone())
} else {
self.new_connection(server_name, server_addr)
}
debug_span!("connect", %server_name,%server_addr).in_scope(|| {
if self.reuse_connection {
REUSEABLE_CONNECTIONS
.entry((server_name.clone(), server_addr))
.or_try_insert_with(|| self.new_connection(server_name, server_addr))
.map(|entry| entry.clone())
} else {
self.new_connection(server_name, server_addr)
}
})
}
}

Expand Down Expand Up @@ -370,12 +375,12 @@ impl<T> QuicClientBuilder<T> {
Err(join_error) if join_error.is_cancelled() => return,
Err(join_error) => join_error.into(),
};
log::warn!(
tracing::warn!(
"interface on {local_addr} that client bound was closed unexpectedly: {error}"
);
bind_interfaces.remove(&local_addr);
}
log::warn!("all interfaces that client bound were closed unexpectedly, client will not be able to connect to the server");
tracing::warn!("all interfaces that client bound were closed unexpectedly, client will not be able to connect to the server");
}
});
Ok(self)
Expand Down
35 changes: 20 additions & 15 deletions gm-quic/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use rustls::{
ConfigBuilder, ServerConfig as TlsServerConfig, WantsVerifier,
};
use tokio::sync::mpsc;
use tracing::{debug_span, Instrument};

use crate::{
interfaces::Interfaces,
Expand Down Expand Up @@ -145,6 +146,7 @@ impl QuicServer {

// internal methods
impl QuicServer {
#[tracing::instrument(skip(packet))]
pub(crate) async fn try_accpet_connection(packet: Packet, pathway: Pathway, socket: Socket) {
let Some(server) = SERVER.read().unwrap().upgrade() else {
return;
Expand All @@ -162,7 +164,7 @@ impl QuicServer {
},
_ => return,
};
log::info!("accepting connection from {}", socket.src());
tracing::info!("accepting connection from {}", socket.src());

let token_provider = server
.token_provider
Expand All @@ -184,21 +186,24 @@ impl QuicServer {
PROTO.deliver(packet, pathway, socket).await;
_ = server.listener.send((connection.clone(), pathway));

tokio::spawn(async move {
while let Some(event) = events.recv().await {
match event {
Event::Handshaked => {}
Event::ProbedNewPath(..) => {}
Event::PathInactivated(_, socket) => {
_ = Interfaces::try_free_interface(socket.src())
tokio::spawn(
async move {
while let Some(event) = events.recv().await {
match event {
Event::Handshaked => {}
Event::ProbedNewPath(..) => {}
Event::PathInactivated(_, socket) => {
_ = Interfaces::try_free_interface(socket.src())
}
Event::Failed(error) => connection.enter_closing(error.into()),
Event::Closed(ccf) => connection.enter_draining(ccf),
Event::StatelessReset => {}
Event::Terminated => { /* Todo: connections set */ }
}
Event::Failed(error) => connection.enter_closing(error.into()),
Event::Closed(ccf) => connection.enter_draining(ccf),
Event::StatelessReset => {}
Event::Terminated => { /* Todo: connections set */ }
}
}
});
.instrument(debug_span!("server_connection_driver")),
);
}

fn shutdown(&self) {
Expand Down Expand Up @@ -582,7 +587,7 @@ impl QuicServerBuilder<TlsServerConfig> {
Err(join_error) => join_error.into(),
};
let local_addr = local_addrs[iface_idx];
log::error!("interface on {local_addr} that server listened was closed unexpectedly: {error}");
tracing::error!("interface on {local_addr} that server listened was closed unexpectedly: {error}");
server.shutdown();
}
});
Expand Down Expand Up @@ -682,7 +687,7 @@ impl QuicServerSniBuilder<TlsServerConfig> {
Err(join_error) => join_error.into(),
};
let local_addr = local_addrs[iface_idx];
log::error!("interface on {local_addr} that server listened was closed unexpectedly: {error}");
tracing::error!("interface on {local_addr} that server listened was closed unexpectedly: {error}");
server.shutdown();
}
});
Expand Down
2 changes: 1 addition & 1 deletion h3-shim/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ gm-quic = { workspace = true }
qbase = { workspace = true }
tokio = { workspace = true }

log = { workspace = true }
[dev-dependencies]
clap = { workspace = true, features = ["derive"] }
http = { workspace = true }
log = { workspace = true }
rustls = { workspace = true, features = ["ring"] }
rustls-native-certs = { workspace = true }
tokio = { workspace = true, features = ["io-std", "fs"] }
Expand Down
6 changes: 4 additions & 2 deletions h3-shim/examples/h3-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use clap::Parser;
use futures::future;
use rustls::pki_types::{pem::PemObject, CertificateDer};
use tokio::io::AsyncWriteExt;
use tracing::{debug, error, info};
use tracing::{error, info, trace};

static ALPN: &[u8] = b"h3";

Expand Down Expand Up @@ -90,7 +90,7 @@ pub async fn run(opt: Opt) -> Result<(), Box<dyn core::error::Error + Send + Syn
params.set_initial_max_stream_data_bidi_local((1u32 << 20).into());
params.set_initial_max_stream_data_bidi_remote((1u32 << 20).into());

debug!(bind = ?opt.bind, "build QuicClient");
trace!(bind = ?opt.bind, "build QuicClient");
let quic_client = ::gm_quic::QuicClient::builder()
.with_root_certificates(roots)
.without_cert()
Expand Down Expand Up @@ -145,6 +145,8 @@ pub async fn run(opt: Opt) -> Result<(), Box<dyn core::error::Error + Send + Syn
out.flush().await?;
}

info!("all data received");

tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok::<_, Box<dyn std::error::Error + 'static + Send + Sync>>(())
};
Expand Down
4 changes: 2 additions & 2 deletions h3-shim/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<B: bytes::Buf> h3::quic::OpenStreams<B> for QuicConnection {
#[inline]
fn close(&mut self, code: h3::error::Code, reason: &[u8]) {
let reason = unsafe { String::from_utf8_unchecked(reason.to_vec()) };
self.connection.close(reason, code.into());
self.connection.close(reason.into(), code.into());
}
}

Expand Down Expand Up @@ -173,7 +173,7 @@ impl<B: bytes::Buf> h3::quic::OpenStreams<B> for OpenStreams {
#[inline]
fn close(&mut self, code: h3::error::Code, reason: &[u8]) {
let reason = unsafe { String::from_utf8_unchecked(reason.to_vec()) };
self.connection.close(reason, code.into());
self.connection.close(reason.into(), code.into());
}
}

Expand Down
9 changes: 7 additions & 2 deletions h3-shim/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::path::PathBuf;

use tracing::{info_span, Instrument};

mod client_example {
use crate as h3_shim;
include!("../examples/h3-client.rs");
Expand All @@ -10,13 +12,15 @@ mod server_example {
include!("../examples/h3-server.rs");
}

#[tokio::test]
#[tokio::test(flavor = "current_thread")]
async fn h3_test() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL)
.with_max_level(tracing::Level::DEBUG)
.with_writer(std::io::stdout)
// .pretty()
// .with_ansi(false)
.init();
// CryptoProvider ring is installed automatically.

Expand All @@ -41,14 +45,15 @@ async fn h3_test() {

let client = async move {
client_example::run(client_opt)
.instrument(info_span!("client"))
.await
.expect("client failed");
};

let server = async move {
// give it a litte time to enter draining state...
let test_time = std::time::Duration::from_secs(2);
let run = server_example::run(server_opt);
let run = server_example::run(server_opt).instrument(info_span!("server"));
match tokio::time::timeout(test_time, run).await {
Ok(result) => result.expect("server failed"),
Err(_finish) => { /* ok */ }
Expand Down
4 changes: 3 additions & 1 deletion qbase/src/frame.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt::Debug;

use bytes::{Buf, BufMut, Bytes};
use enum_dispatch::enum_dispatch;
use io::WriteFrame;
Expand Down Expand Up @@ -58,7 +60,7 @@ pub use streams_blocked::StreamsBlockedFrame;

/// Define the basic behaviors for all kinds of frames
#[enum_dispatch]
pub trait BeFrame {
pub trait BeFrame: Debug {
/// Return the type of frame
fn frame_type(&self) -> FrameType;

Expand Down
2 changes: 1 addition & 1 deletion qbase/src/sid/remote_sid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ where
} else {
let start = *cur;
*cur = unsafe { sid.next_unchecked() };
log::debug!("unallocated: {:?}", self.unallocated[idx]);
log::trace!("unallocated: {:?}", self.unallocated[idx]);
if let Some(max_streams) = self.ctrl.on_accept_streams(sid.dir(), sid.id()) {
self.max[idx] = max_streams;
self.max_tx.send_frame([MaxStreamsFrame::with(
Expand Down
5 changes: 2 additions & 3 deletions qcongestion/src/congestion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
time::{Duration, Instant},
};

use log::debug;
use qbase::{
frame::{AckFrame, EcnCounts, HandshakeDoneFrame, SendFrame},
handshake::Handshake,
Expand Down Expand Up @@ -278,7 +277,7 @@ impl CongestionController {
// Client sends an anti-deadlock packet: Initial is padded
// to earn more anti-amplification credit,
// a Handshake packet proves address ownership.
debug!("Anti-deadlock packet sent");
log::trace!("Anti-deadlock packet sent");
if self.handshake.is_getting_keys() {
Epoch::Handshake
} else {
Expand All @@ -292,7 +291,7 @@ impl CongestionController {
};

self.pto_count += 1;
log::debug!(
log::trace!(
"{:?} PTO timeout, epoch: {:?}, pto_count: {} inflight: {:?}",
self.handshake.role(),
pto_epoch,
Expand Down
Loading

0 comments on commit c3f615b

Please sign in to comment.