Skip to content

Commit

Permalink
feat: wake up the sending task during retransmission
Browse files Browse the repository at this point in the history
  • Loading branch information
eareimu authored and huster-zhangpeng committed Jan 23, 2025
1 parent c3f615b commit 6ef0f0c
Show file tree
Hide file tree
Showing 16 changed files with 115 additions and 96 deletions.
6 changes: 3 additions & 3 deletions gm-quic/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use rustls::{
ClientConfig as TlsClientConfig, ConfigBuilder, WantsVerifier,
};
use tokio::sync::mpsc;
use tracing::{debug_span, Instrument};
use tracing::{trace_span, Instrument};

use crate::{
interfaces::Interfaces,
Expand Down Expand Up @@ -220,7 +220,7 @@ impl QuicClient {
}
}
}
.instrument(debug_span!("client_connection_driver")),
.instrument(trace_span!("client_connection_driver")),
);

connection.add_path(socket, pathway)?;
Expand Down Expand Up @@ -263,7 +263,7 @@ impl QuicClient {
server_addr: SocketAddr,
) -> io::Result<Arc<Connection>> {
let server_name = server_name.into();
debug_span!("connect", %server_name,%server_addr).in_scope(|| {
trace_span!("connect", %server_name,%server_addr).in_scope(|| {
if self.reuse_connection {
REUSEABLE_CONNECTIONS
.entry((server_name.clone(), server_addr))
Expand Down
4 changes: 2 additions & 2 deletions gm-quic/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use rustls::{
ConfigBuilder, ServerConfig as TlsServerConfig, WantsVerifier,
};
use tokio::sync::mpsc;
use tracing::{debug_span, Instrument};
use tracing::{trace_span, Instrument};

use crate::{
interfaces::Interfaces,
Expand Down Expand Up @@ -202,7 +202,7 @@ impl QuicServer {
}
}
}
.instrument(debug_span!("server_connection_driver")),
.instrument(trace_span!("server_connection_driver")),
);
}

Expand Down
7 changes: 4 additions & 3 deletions h3-shim/examples/h3-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use clap::Parser;
use futures::future;
use rustls::pki_types::{pem::PemObject, CertificateDer};
use tokio::io::AsyncWriteExt;
use tracing::{error, info, trace};
use tracing::{error, info, info_span, trace, Instrument};

static ALPN: &[u8] = b"h3";

Expand Down Expand Up @@ -120,7 +120,7 @@ pub async fn run(opt: Opt) -> Result<(), Box<dyn core::error::Error + Send + Syn
// So we "move" it.
// vvvv
let request = async move {
info!("sending request ...");
info!(%uri,"sending request ...");

let req = http::Request::builder().uri(uri).body(())?;

Expand Down Expand Up @@ -149,7 +149,8 @@ pub async fn run(opt: Opt) -> Result<(), Box<dyn core::error::Error + Send + Syn

tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok::<_, Box<dyn std::error::Error + 'static + Send + Sync>>(())
};
}
.instrument(info_span!("request"));

let derive = tokio::spawn(driver);
let request = tokio::spawn(request);
Expand Down
57 changes: 30 additions & 27 deletions h3-shim/examples/h3-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use h3::{error::ErrorLevel, quic::BidiStream, server::RequestStream};
use http::{Request, StatusCode};
use qbase::param::ServerParameters;
use tokio::{fs::File, io::AsyncReadExt};
use tracing::{error, info};
use tracing::{error, info, info_span, Instrument};

#[derive(Parser, Debug)]
#[structopt(name = "server")]
Expand Down Expand Up @@ -111,38 +111,40 @@ pub async fn run(opt: Opt) -> Result<(), Box<dyn std::error::Error + Send + Sync
else {
continue;
};
tokio::spawn(async move {
info!("new connection established");
loop {
match h3_conn.accept().await {
Ok(Some((req, stream))) => {
info!("new request: {:#?}", req);

let root = root.clone();

tokio::spawn(async {
if let Err(e) = handle_request(req, stream, root).await {
error!("handling request failed: {}", e);
}
info!("request handled");
});
}
tokio::spawn(
async move {
info!("new connection established");
loop {
match h3_conn.accept().await {
Ok(Some((req, stream))) => {
info!("new request: {:#?}", req);

let root = root.clone();

tokio::spawn(async {
if let Err(e) = handle_request(req, stream, root).await {
error!("handling request failed: {}", e);
}
});
}

// indicating no more streams to be received
Ok(None) => {
break;
}
// indicating no more streams to be received
Ok(None) => {
break;
}

Err(err) => {
error!("error on accept connection: {}", err);
match err.get_error_level() {
ErrorLevel::ConnectionError => break,
ErrorLevel::StreamError => continue,
Err(err) => {
error!("error on accept connection: {}", err);
match err.get_error_level() {
ErrorLevel::ConnectionError => break,
ErrorLevel::StreamError => continue,
}
}
}
}
}
});
.instrument(info_span!("handle_connection")),
);
}

// shut down gracefully
Expand All @@ -151,6 +153,7 @@ pub async fn run(opt: Opt) -> Result<(), Box<dyn std::error::Error + Send + Sync
Ok(())
}

#[tracing::instrument(skip_all)]
async fn handle_request<T>(
req: Request<()>,
mut stream: RequestStream<T, Bytes>,
Expand Down
2 changes: 1 addition & 1 deletion h3-shim/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod server_example {
async fn h3_test() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL)
// .with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL)
.with_max_level(tracing::Level::DEBUG)
.with_writer(std::io::stdout)
// .pretty()
Expand Down
26 changes: 14 additions & 12 deletions qconnection/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use qinterface::{
};
pub use rustls::crypto::CryptoProvider;
use tokio::sync::Notify;
use tracing::{debug_span, Instrument};
use tracing::{trace_span, Instrument};

use crate::{
events::{ArcEventBroker, EmitEvent, Event},
Expand Down Expand Up @@ -270,13 +270,14 @@ impl ProtoReady<ClientFoundation, Arc<rustls::ClientConfig>> {
);

let spaces = Spaces::new(
InitialSpace::new(initial_keys, self.foundation.token),
HandshakeSpace::new(),
InitialSpace::new(initial_keys, self.foundation.token, sendable.clone()),
HandshakeSpace::new(sendable.clone()),
DataSpace::new(
sid::Role::Client,
reliable_frames.clone(),
client_params,
self.streams_ctrl,
sendable.clone(),
),
);

Expand Down Expand Up @@ -347,13 +348,14 @@ impl ProtoReady<ServerFoundation, Arc<rustls::ServerConfig>> {
);

let spaces = Spaces::new(
InitialSpace::new(initial_keys, Vec::with_capacity(0)),
HandshakeSpace::new(),
InitialSpace::new(initial_keys, Vec::with_capacity(0), sendable.clone()),
HandshakeSpace::new(sendable.clone()),
DataSpace::new(
sid::Role::Server,
reliable_frames.clone(),
server_params,
self.streams_ctrl,
sendable.clone(),
),
);

Expand Down Expand Up @@ -443,11 +445,11 @@ fn accpet_transport_parameters(components: &Components) -> impl Future<Output =
}
}
}
.instrument(debug_span!("accept_transport_parameters"))
.instrument(trace_span!("accept_transport_parameters"))
}

impl Components {
#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
pub fn get_or_create_path(
&self,
socket: Socket,
Expand Down Expand Up @@ -502,7 +504,7 @@ impl Components {
tracing::trace!(reason, "path inactive");
// same as [`Components::del_path`]
paths.remove(&pathway);
}.instrument(debug_span!("path_task", ?pathway,?socket,is_probed,do_validate))
}.instrument(trace_span!("path_task", ?pathway,?socket,is_probed,do_validate))
});

vacant_entry.insert(PathContext::new(path.clone(), task.abort_handle()));
Expand All @@ -514,7 +516,7 @@ impl Components {

impl Components {
// 对于server,第一条路径也通过add_path添加
#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
pub fn enter_closing(self, ccf: ConnectionCloseFrame) -> Termination {
let error = ccf.clone().into();
self.spaces.data().on_conn_error(&error);
Expand All @@ -537,15 +539,15 @@ impl Components {
local_cids.clear();
event_broker.emit(Event::Terminated);
}
.instrument(debug_span!("termination_timer", ?pto_duration))
.instrument(trace_span!("termination_timer", ?pto_duration))
});

self.spaces.close(closing_state.clone(), &self.event_broker);

Termination::closing(error, self.cid_registry.local, closing_state)
}

#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
pub fn enter_draining(self, error: Error) -> Termination {
self.spaces.data().on_conn_error(&error);
self.flow_ctrl.on_conn_error(&error);
Expand All @@ -566,7 +568,7 @@ impl Components {
local_cids.clear();
event_broker.emit(Event::Terminated);
}
.instrument(debug_span!("termination_timer", ?pto_duration))
.instrument(trace_span!("termination_timer", ?pto_duration))
});

self.rcvd_pkt_q.close_all();
Expand Down
22 changes: 11 additions & 11 deletions qconnection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,15 @@ impl Components {
pub struct Connection(RwLock<Result<Components, Termination>>);

impl Connection {
#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
pub fn enter_closing(&self, ccf: ConnectionCloseFrame) {
let mut conn = self.0.write().unwrap();
if let Ok(core_conn) = conn.as_mut() {
*conn = Err(core_conn.clone().enter_closing(ccf));
}
}

#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
pub fn enter_draining(&self, ccf: ConnectionCloseFrame) {
let error = ccf.into();
let mut conn = self.0.write().unwrap();
Expand All @@ -275,7 +275,7 @@ impl Connection {
}
}

#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
pub fn close(&self, reason: Cow<'static, str>, code: u64) {
let error_code = code.try_into().unwrap();
self.enter_closing(ConnectionCloseFrame::new_app(error_code, reason));
Expand All @@ -289,46 +289,46 @@ impl Connection {
.map_err(|termination| termination.error().into())
}

#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
pub async fn open_bi_stream(
&self,
) -> io::Result<Option<(StreamId, (StreamReader, StreamWriter))>> {
self.map(|core_conn| core_conn.open_bi_stream())?.await
}

#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
pub async fn open_uni_stream(&self) -> io::Result<Option<(StreamId, StreamWriter)>> {
self.map(|core_conn| core_conn.open_uni_stream())?.await
}

#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
pub async fn accept_bi_stream(
&self,
) -> io::Result<Option<(StreamId, (StreamReader, StreamWriter))>> {
self.map(|core_conn| core_conn.accept_bi_stream())?.await
}

#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
pub async fn accept_uni_stream(&self) -> io::Result<Option<(StreamId, StreamReader)>> {
self.map(|core_conn| core_conn.accept_uni_stream())?.await
}

#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
pub fn unreliable_reader(&self) -> io::Result<UnreliableReader> {
self.map(|core_conn| core_conn.unreliable_reader())?
}

#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
pub async fn unreliable_writer(&self) -> io::Result<UnreliableWriter> {
self.map(|core_conn| core_conn.unreliable_writer())?.await
}

#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
pub fn add_path(&self, socket: Socket, pathway: Pathway) -> io::Result<()> {
self.map(|core_conn| core_conn.add_path(socket, pathway))
}

#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
pub fn del_path(&self, pathway: &Pathway) -> io::Result<()> {
self.map(|core_conn| core_conn.del_path(pathway))
}
Expand Down
2 changes: 1 addition & 1 deletion qconnection/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Path {
self.validated.store(true, Ordering::Release);
}

#[tracing::instrument(level = "debug", skip(self), ret)]
#[tracing::instrument(level = "trace", skip(self), ret)]
pub async fn validate(&self) -> bool {
let challenge = PathChallengeFrame::random();
for _ in 0..3 {
Expand Down
2 changes: 1 addition & 1 deletion qconnection/src/path/burst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl Burst {
Ok(filled_buffers)
}

#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
pub async fn launch(self) -> io::Result<Infallible> {
let mut buffers = vec![];
let mut path_sendable = pin!(self.path.sendable.notified());
Expand Down
Loading

0 comments on commit 6ef0f0c

Please sign in to comment.