diff --git a/h3i/Cargo.toml b/h3i/Cargo.toml index c47b426907..0b0774e953 100644 --- a/h3i/Cargo.toml +++ b/h3i/Cargo.toml @@ -14,7 +14,7 @@ categories = { workspace = true } readme = "README.md" [features] -async = ["dep:buffer-pool", "dep:tokio", "dep:tokio-quiche"] +async = ["dep:buffer-pool", "dep:futures", "dep:tokio", "dep:tokio-quiche"] [dependencies] clap = "3" @@ -35,5 +35,6 @@ url = { workspace = true } # Dependencies for async client buffer-pool = { optional = true, workspace = true } +futures = { version = "0.3", optional = true } tokio = { version = "1.44", features = ["net", "sync", "time"], optional = true } tokio-quiche = { features = ["capture_keylogs", "quiche_internal"], optional = true, workspace = true } diff --git a/h3i/src/client/async_client.rs b/h3i/src/client/async_client.rs index 72ba5acb0b..52b986b6ef 100644 --- a/h3i/src/client/async_client.rs +++ b/h3i/src/client/async_client.rs @@ -29,16 +29,21 @@ use buffer_pool::ConsumeBuffer; use buffer_pool::Pooled; +use futures::future::OptionFuture; use log; use quiche::PathStats; use quiche::Stats; +use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; use std::task::Context; use std::task::Poll; use std::time::Duration; +use tokio::net::UdpSocket; use tokio::select; use tokio::sync::mpsc; +use tokio::sync::mpsc::error::SendError; use tokio::sync::oneshot; use tokio::time::sleep; use tokio::time::sleep_until; @@ -74,6 +79,49 @@ use super::Client; use super::ConnectionCloseDetails; use super::StreamParserMap; +/// Handle for interactive action execution on an established connection. +/// +/// This handle allows submitting actions one at a time via [`do_action`], +/// rather than providing all actions upfront. Actions can be queued even +/// while a `Wait` action is pending. +/// +/// [`do_action`]: H3iHandle::do_action +pub struct H3iHandle { + action_tx: mpsc::UnboundedSender, + summary_fut: BuildingConnectionSummary, +} + +impl H3iHandle { + /// Submit a single action to the connection. + /// + /// This queues the action to be executed by the [`H3iDriver`]. + /// Actions are executed in order. [`Wait`] actions will block subsequent + /// actions until the wait condition is satisfied. + /// + /// Use [`finish`] to get the [`ConnectionSummary`] when desired. + /// + /// # Returns + /// + /// - `Ok(())` if the action was successfully queued + /// - `Err(SendError)` if the connection has closed. Call [`finish`] + /// to retrieve the [`ConnectionSummary`]. + /// + /// [`Wait`]: Action::Wait + /// [`finish`]: H3iHandle::finish + pub fn do_action(&self, action: Action) -> Result<(), SendError> { + self.action_tx.send(action) + } + + /// Finish the connection and return the summary. + /// + /// This consumes the handle. The connection will complete any pending + /// actions before returning the summary. + pub async fn finish(self) -> ConnectionSummary { + drop(self.action_tx); // Signal no more actions coming + self.summary_fut.await + } +} + /// Connect to the socket. pub async fn connect( args: &H3iConfig, frame_actions: Vec, @@ -85,26 +133,13 @@ pub async fn connect( connection_params.session = args.session.clone(); - let ParsedArgs { - connect_url, - bind_addr, - peer_addr, - } = parse_args(args); - - let socket = tokio::net::UdpSocket::bind(bind_addr).await.unwrap(); - socket.connect(peer_addr).await.unwrap(); - - log::info!( - "connecting to {:} from {:}", - peer_addr, - socket.local_addr().unwrap() - ); + let (socket, parsed_args) = create_socket(args).await; let (h3i, conn_summary_fut) = H3iDriver::new(frame_actions, close_trigger_frames); match tokio_quiche::quic::connect_with_config( - Socket::try_from(socket).unwrap(), - connect_url, + socket, + parsed_args.connect_url, &connection_params, h3i, ) @@ -115,6 +150,59 @@ pub async fn connect( } } +/// Connect to the socket in interactive mode. +/// +/// Unlike [`connect`], this function returns an [`H3iHandle`] that allows +/// submitting actions one at a time via [`H3iHandle::do_action`], rather than +/// providing all actions upfront. +/// +/// # Example +/// +/// ```ignore +/// let handle = connect_interactive(&config, None).await?; +/// +/// // Submit actions one at a time +/// handle.do_action(send_headers_frame(0, false, headers)); +/// handle.do_action(Action::Wait { +/// wait_type: WaitType::StreamEvent(StreamEvent { +/// stream_id: 0, +/// event_type: StreamEventType::Headers, +/// }) +/// }); +/// +/// // Get the connection summary when done +/// let summary = handle.finish().await; +/// ``` +pub async fn connect_interactive( + args: &H3iConfig, close_trigger_frames: Option, +) -> std::result::Result { + let quic_settings = create_config(args); + let mut connection_params = + ConnectionParams::new_client(quic_settings, None, Hooks::default()); + + connection_params.session = args.session.clone(); + + let (socket, parsed_args) = create_socket(args).await; + + let (h3i, conn_summary_fut, action_tx) = + H3iDriver::new_interactive(close_trigger_frames); + + match tokio_quiche::quic::connect_with_config( + socket, + parsed_args.connect_url, + &connection_params, + h3i, + ) + .await + { + Ok(_) => Ok(H3iHandle { + action_tx, + summary_fut: conn_summary_fut, + }), + Err(_) => Err(ClientError::HandshakeFail), + } +} + fn create_config(args: &H3iConfig) -> QuicSettings { let mut quic_settings = QuicSettings::default(); @@ -148,6 +236,29 @@ fn create_config(args: &H3iConfig) -> QuicSettings { quic_settings } +/// Creates and connects a UDP socket based on the provided config. +/// +/// Returns the socket wrapped in a `Socket` and the parsed args containing +/// the connect URL. +async fn create_socket( + args: &H3iConfig, +) -> (Socket, Arc>, ParsedArgs<'_>) { + let parsed_args = parse_args(args); + + let socket = tokio::net::UdpSocket::bind(parsed_args.bind_addr) + .await + .unwrap(); + socket.connect(parsed_args.peer_addr).await.unwrap(); + + log::info!( + "connecting to {:} from {:}", + parsed_args.peer_addr, + socket.local_addr().unwrap() + ); + + (Socket::try_from(socket).unwrap(), parsed_args) +} + /// The [`Future`] used to build a [`ConnectionSummary`]. /// /// At a high level, [`H3iDriver`] will interact with the UDP socket directly, @@ -224,16 +335,21 @@ impl Future for BuildingConnectionSummary { pub struct H3iDriver { buffer: Pooled, - actions: Vec, - actions_executed: usize, next_fire_time: Instant, waiting_for_responses: WaitingFor, record_tx: mpsc::UnboundedSender, stream_parsers: StreamParserMap, close_trigger_seen_rx: oneshot::Receiver<()>, + + /// Channel for receiving actions in interactive mode. + /// `None` in batch mode. + action_rx: Option>, + /// Queue of actions to be processed + action_queue: VecDeque, } impl H3iDriver { + /// Create a new H3iDriver with a pre-defined list of actions (batch mode). fn new( actions: Vec, close_trigger_frames: Option, ) -> (Self, BuildingConnectionSummary) { @@ -248,46 +364,106 @@ impl H3iDriver { ( Self { buffer: BufFactory::get_max_buf(), - actions, - actions_executed: 0, next_fire_time: Instant::now(), waiting_for_responses: WaitingFor::default(), record_tx, stream_parsers: StreamParserMap::default(), close_trigger_seen_rx, + action_rx: None, + action_queue: actions.into(), }, fut, ) } + /// Create a new H3iDriver for interactive mode (channel-based actions). + fn new_interactive( + close_trigger_frames: Option, + ) -> ( + Self, + BuildingConnectionSummary, + mpsc::UnboundedSender, + ) { + let (record_tx, record_rx) = mpsc::unbounded_channel(); + let (close_trigger_seen_tx, close_trigger_seen_rx) = oneshot::channel(); + let (action_tx, action_rx) = mpsc::unbounded_channel(); + let fut = BuildingConnectionSummary::new( + record_rx, + close_trigger_frames, + close_trigger_seen_tx, + ); + + ( + Self { + buffer: BufFactory::get_max_buf(), + next_fire_time: Instant::now(), + waiting_for_responses: WaitingFor::default(), + record_tx, + stream_parsers: StreamParserMap::default(), + close_trigger_seen_rx, + action_rx: Some(action_rx), + action_queue: VecDeque::new(), + }, + fut, + action_tx, + ) + } + /// If the next action should fire. fn should_fire(&self) -> bool { Instant::now() >= self.next_fire_time } - /// Insert all waits into the waiting set. + /// Register any pending wait actions from the front of the queue. + /// + /// This must be called in `process_reads` before parsing streams, so that + /// waits are registered before we check if stream events satisfy them. + /// Otherwise, if an event arrives in the same iteration that we would + /// register its wait, we'd miss it and hang. fn register_waits(&mut self) { - while self.actions_executed < self.actions.len() { - if let Action::Wait { wait_type } = - &self.actions[self.actions_executed] - { - self.actions_executed += 1; - - match wait_type { - WaitType::WaitDuration(duration) => { - self.next_fire_time = Instant::now() + *duration; - - log::debug!( - "h3i: waiting for responses: {:?}", - self.waiting_for_responses - ); - }, - WaitType::StreamEvent(event) => { - self.waiting_for_responses.add_wait(event); - }, - } - } else { - break; + while let Some(action) = self.action_queue.front() { + match action { + Action::Wait { wait_type } => { + match wait_type { + WaitType::WaitDuration(duration) => { + log::debug!("h3i: WaitDuration {:?}", duration); + self.next_fire_time = Instant::now() + *duration; + }, + WaitType::StreamEvent(event) => { + log::debug!("h3i: WaitStreamEvent {:?}", event); + self.waiting_for_responses.add_wait(event); + }, + } + + self.action_queue.pop_front(); + }, + _ => break, + } + } + } + + /// Process non-wait actions from the action queue. + /// + /// This executes actions until we hit a `Wait` or `FlushPackets` action. + /// Wait actions are handled by `register_waits()` in `process_reads`. + fn process_action_queue(&mut self, qconn: &mut QuicheConnection) { + while let Some(action) = self.action_queue.pop_front() { + match &action { + Action::Wait { .. } => { + // Put it back - waits are handled in register_waits() + self.action_queue.push_front(action); + return; + }, + Action::FlushPackets => { + log::debug!("h3i: FlushPackets"); + + // Yield control back to the IOW so it can flush packets. + return; + }, + _ => { + log::debug!("h3i: execute action {:?}", action); + execute_action(&action, qconn, self.stream_parsers_mut()); + }, } } } @@ -340,52 +516,23 @@ impl ApplicationOverQuic for H3iDriver { fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> { log::trace!("h3i: process_writes"); + // Skip if waiting for duration + if !self.should_fire() { + log::debug!("h3i: waiting for duration, skipping process_writes"); + return Ok(()); + } + + // Skip if waiting for stream events if !self.waiting_for_responses.is_empty() { log::debug!( - "awaiting responses on streams {:?}, skipping further action", + "awaiting responses on streams {:?}, skipping process_writes", self.waiting_for_responses ); - return Ok(()); } - // Re-create the iterator so we can mutably borrow the stream parser map - let iter = self.actions.clone().into_iter().skip(self.actions_executed); - - for action in iter { - match action { - Action::SendFrame { .. } | - Action::StreamBytes { .. } | - Action::SendDatagram { .. } | - Action::ResetStream { .. } | - Action::StopSending { .. } | - Action::OpenUniStream { .. } | - Action::ConnectionClose { .. } | - Action::SendHeadersFrame { .. } => { - if self.should_fire() { - // Reset the fire time such that the next action will - // still fire. - self.next_fire_time = Instant::now(); - - execute_action(&action, qconn, self.stream_parsers_mut()); - self.actions_executed += 1; - } else { - break; - } - }, - Action::Wait { .. } => { - // Break out of the write phase if we see a wait, since waits - // have to be registered in the read - // phase. The actions_executed pointer will be - // incremented there as well - break; - }, - Action::FlushPackets => { - self.actions_executed += 1; - break; - }, - } - } + // Process actions from the queue + self.process_action_queue(qconn); Ok(()) } @@ -403,6 +550,11 @@ impl ApplicationOverQuic for H3iDriver { sleep(Duration::MAX) }; + // Wrap the action receiver in OptionFuture so it resolves to None in + // batch mode and doesn't block the select. + let action_recv_fut: OptionFuture<_> = + self.action_rx.as_mut().map(|rx| rx.recv()).into(); + select! { rx = &mut self.close_trigger_seen_rx, if !self.close_trigger_seen_rx.is_terminated() => { // NOTE: wait_for_data can be called again after all close triggers have been seen, @@ -413,6 +565,31 @@ impl ApplicationOverQuic for H3iDriver { let _ = qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, b"saw all expected frames"); } } + + // Receive actions from the channel (interactive mode only). + // In batch mode action_rx is None, so OptionFuture resolves to None immediately. + Some(result) = action_recv_fut => { + match result { + Some(action) => { + self.action_queue.push_back(action); + + // Drain all additional available actions without blocking + if let Some(rx) = &mut self.action_rx { + while let Ok(action) = rx.try_recv() { + self.action_queue.push_back(action); + } + } + } + + None => { + // Action channel is closed because the user either called finish() or + // dropped H3iHandle. + let _ = qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, b"client done"); + self.action_rx = None; + } + } + } + _ = sleep_fut => {} }