diff --git a/quiche/src/h3/mod.rs b/quiche/src/h3/mod.rs index 748a047f66..0ef0190938 100644 --- a/quiche/src/h3/mod.rs +++ b/quiche/src/h3/mod.rs @@ -2050,6 +2050,11 @@ impl Connection { // TODO: check if stream is completed so it can be freed if let Some(ev) = ev { + // Cycle the stream after returning Headers to enable + // round-robin "skim" behavior across multiple streams. + if matches!(ev.1, Event::Headers { .. }) { + conn.cycle_readable(ev.0); + } return Ok(ev); } } @@ -3819,6 +3824,10 @@ mod tests { s.send_body_client(stream2, true).unwrap(); s.send_body_client(stream1, true).unwrap(); + // With reader-side round-robin cycling after Headers events, + // we expect interleaved ordering: all Headers first, then Data/Finished. + + // All Headers events first (streams cycle after each Headers) let (_, ev) = s.poll_server().unwrap(); let ev_headers = Event::Headers { list: reqs[0].clone(), @@ -3840,6 +3849,7 @@ mod tests { }; assert_eq!(ev, ev_headers); + // Now Data and Finished events for each stream assert_eq!(s.poll_server(), Ok((0, Event::Data))); assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len())); assert_eq!(s.poll_client(), Err(Error::Done)); diff --git a/quiche/src/lib.rs b/quiche/src/lib.rs index bb21280d05..27548440b0 100644 --- a/quiche/src/lib.rs +++ b/quiche/src/lib.rs @@ -429,8 +429,6 @@ use crate::recovery::OnLossDetectionTimeoutOutcome; use crate::recovery::RecoveryOps; use crate::recovery::ReleaseDecision; -use crate::stream::StreamPriorityKey; - /// The current QUIC wire version. pub const PROTOCOL_VERSION: u32 = PROTOCOL_VERSION_V1; @@ -4808,14 +4806,14 @@ impl Connection { } let priority_key = Arc::clone(&stream.priority_key); + // If the stream is no longer flushable, remove it from the queue if !stream.is_flushable() { self.streams.remove_flushable(&priority_key); } else if stream.incremental { - // Shuffle the incremental stream to the back of the - // queue. - self.streams.remove_flushable(&priority_key); - self.streams.insert_flushable(&priority_key); + // Cycle the incremental stream to the back of the queue + // for round-robin scheduling. + self.streams.cycle_flushable(stream_id); } #[cfg(feature = "fuzzing")] @@ -5566,15 +5564,21 @@ impl Connection { stream.urgency = urgency; stream.incremental = incremental; - let new_priority_key = Arc::new(StreamPriorityKey { - urgency: stream.urgency, - incremental: stream.incremental, + let old_priority_key = Arc::clone(&stream.priority_key); + + let sequence = self.streams.next_sequence(); + + let stream = self.streams.get_mut(stream_id).unwrap(); + + let new_priority_key = Arc::new(stream::StreamPriorityKey { + urgency, + incremental, id: stream_id, + sequence, ..Default::default() }); - let old_priority_key = - std::mem::replace(&mut stream.priority_key, new_priority_key.clone()); + stream.priority_key = Arc::clone(&new_priority_key); self.streams .update_priority(&old_priority_key, &new_priority_key); @@ -5767,6 +5771,19 @@ impl Connection { stream.is_readable() } + /// Cycles a readable stream to the back of the priority queue. + /// + /// This is used for round-robin scheduling of readable streams. After + /// processing data from a stream, calling this method moves it to the + /// back of its priority group, giving other streams a chance to be + /// processed. + /// + /// Returns `true` if the stream was successfully cycled, `false` if the + /// stream doesn't exist or isn't in the readable set. + pub(crate) fn cycle_readable(&mut self, stream_id: u64) -> bool { + self.streams.cycle_readable(stream_id) + } + /// Returns the next stream that can be written to. /// /// Note that once returned by this method, a stream ID will not be returned diff --git a/quiche/src/stream/mod.rs b/quiche/src/stream/mod.rs index d48bde31c0..6232bf00cc 100644 --- a/quiche/src/stream/mod.rs +++ b/quiche/src/stream/mod.rs @@ -184,6 +184,18 @@ pub struct StreamMap { /// The maximum size of a stream window. max_stream_window: u64, + + /// Monotonically increasing counter used for round-robin scheduling of + /// incremental streams. Each time a stream is created or cycled, it gets + /// the next sequence number, pushing it to the back of its priority group. + /// + /// This counter uses wrapping arithmetic. In the unlikely event of a wrap + /// (after 2^64 increments, which would take ~584 years at 1 billion/sec), + /// streams with pre-wrap sequences would temporarily be deprioritized + /// relative to post-wrap streams. However, this resolves itself naturally: + /// when a stream is cycled after sending data, it receives a fresh + /// post-wrap sequence number and rejoins normal round-robin scheduling. + sequence_counter: u64, } impl StreamMap { @@ -213,6 +225,12 @@ impl StreamMap { self.streams.get_mut(&id) } + /// Returns the next sequence number for priority ordering. + pub fn next_sequence(&mut self) -> u64 { + self.sequence_counter = self.sequence_counter.wrapping_add(1); + self.sequence_counter + } + /// Returns the mutable stream with the given ID if it exists, or creates /// a new one otherwise. /// @@ -321,7 +339,10 @@ impl StreamMap { }, }; - let s = Stream::new( + // Assign a unique sequence number for priority ordering + self.sequence_counter = self.sequence_counter.wrapping_add(1); + + let mut s = Stream::new( id, max_rx_data, max_tx_data, @@ -330,6 +351,15 @@ impl StreamMap { self.max_stream_window, ); + // Set proper sequence for priority ordering + s.priority_key = Arc::new(StreamPriorityKey { + id, + sequence: self.sequence_counter, + urgency: s.urgency, + incremental: s.incremental, + ..Default::default() + }); + let is_writable = s.is_writable(); (v.insert(s), is_writable) @@ -426,6 +456,90 @@ impl StreamMap { self.flushable.front().clone_pointer() } + /// Cycles an incremental stream to the back of the flushable queue. + /// + /// This is used for round-robin scheduling: after sending data from an + /// incremental stream, it should be moved to the back of its priority + /// group to give other streams a chance. + /// + /// Returns `true` if the stream was successfully cycled. + pub fn cycle_flushable(&mut self, stream_id: u64) -> bool { + let stream = match self.streams.get_mut(&stream_id) { + Some(s) => s, + None => return false, + }; + + let old_priority_key = Arc::clone(&stream.priority_key); + + // Only cycle if currently in the flushable set + if !old_priority_key.flushable.is_linked() { + return false; + } + + // Bump the global sequence counter + self.sequence_counter = self.sequence_counter.wrapping_add(1); + + // Create a new priority key with the updated sequence + let new_priority_key = Arc::new(StreamPriorityKey { + urgency: stream.urgency, + incremental: stream.incremental, + id: stream_id, + sequence: self.sequence_counter, + ..Default::default() + }); + + // Replace the stream's priority key + stream.priority_key = Arc::clone(&new_priority_key); + + // Remove from flushable with old key and re-insert with new key + self.remove_flushable(&old_priority_key); + self.flushable.insert(new_priority_key); + + true + } + + /// Cycles a readable stream to the back of the queue. + /// + /// This is used for round-robin scheduling: after processing some data from + /// a stream, it can be moved to the back of its priority group to give + /// other streams a chance of being read. + /// + /// Returns `true` if the stream was successfully cycled. + pub fn cycle_readable(&mut self, stream_id: u64) -> bool { + let stream = match self.streams.get_mut(&stream_id) { + Some(s) => s, + None => return false, + }; + + let old_priority_key = Arc::clone(&stream.priority_key); + + // Only cycle if currently in the readable set + if !old_priority_key.readable.is_linked() { + return false; + } + + // Bump the global sequence counter + self.sequence_counter = self.sequence_counter.wrapping_add(1); + + // Create a new priority key with the updated sequence + let new_priority_key = Arc::new(StreamPriorityKey { + urgency: stream.urgency, + incremental: stream.incremental, + id: stream_id, + sequence: self.sequence_counter, + ..Default::default() + }); + + // Replace the stream's priority key + stream.priority_key = Arc::clone(&new_priority_key); + + // Remove from readable with old key and re-insert with new key + self.remove_readable(&old_priority_key); + self.readable.insert(new_priority_key); + + true + } + /// Updates the priorities of a stream. pub fn update_priority( &mut self, old: &Arc, new: &Arc, @@ -446,6 +560,88 @@ impl StreamMap { } } + /// Cycles a stream to the back of all priority queues it belongs to. + /// + /// This bumps the stream's sequence number, causing it to be ordered after + /// other streams with the same urgency and incremental setting. Used for + /// round-robin scheduling of incremental streams. + /// + /// Returns `true` if the stream was found and cycled. + #[cfg(test)] + pub fn cycle_priority(&mut self, stream_id: u64) -> bool { + let stream = match self.streams.get_mut(&stream_id) { + Some(s) => s, + None => return false, + }; + + let old_priority_key = Arc::clone(&stream.priority_key); + + // Bump the global sequence counter + self.sequence_counter = self.sequence_counter.wrapping_add(1); + + // Create a new priority key with the updated sequence + let new_priority_key = Arc::new(StreamPriorityKey { + urgency: stream.urgency, + incremental: stream.incremental, + id: stream_id, + sequence: self.sequence_counter, + ..Default::default() + }); + + // Replace the stream's priority key + stream.priority_key = Arc::clone(&new_priority_key); + + // Update all queues the stream belongs to + self.update_priority(&old_priority_key, &new_priority_key); + + true + } + + /// Sets a stream's priority (urgency and incremental flag). + /// + /// The stream is placed at the back of its new priority group by assigning + /// a new sequence number. Returns `true` if the stream was found and + /// updated. + #[cfg(test)] + pub fn set_priority( + &mut self, stream_id: u64, urgency: u8, incremental: bool, + ) -> bool { + let stream = match self.streams.get_mut(&stream_id) { + Some(s) => s, + None => return false, + }; + + // Early return if nothing changed + if stream.urgency == urgency && stream.incremental == incremental { + return true; + } + + let old_priority_key = Arc::clone(&stream.priority_key); + + stream.urgency = urgency; + stream.incremental = incremental; + + // Bump the global sequence counter + self.sequence_counter = self.sequence_counter.wrapping_add(1); + + // Create a new priority key with the updated values + let new_priority_key = Arc::new(StreamPriorityKey { + urgency, + incremental, + id: stream_id, + sequence: self.sequence_counter, + ..Default::default() + }); + + // Replace the stream's priority key + stream.priority_key = Arc::clone(&new_priority_key); + + // Update all queues the stream belongs to + self.update_priority(&old_priority_key, &new_priority_key); + + true + } + /// Adds the stream ID to the almost full streams set. /// /// If the stream was already in the list, this does nothing. @@ -788,6 +984,9 @@ pub struct StreamPriorityKey { pub urgency: u8, pub incremental: bool, pub id: u64, + /// Sequence number for round-robin scheduling of incremental streams. + /// Incremented each time a stream is cycled, pushing it to the back. + pub sequence: u64, pub readable: RBTreeAtomicLink, pub writable: RBTreeAtomicLink, @@ -800,6 +999,7 @@ impl Default for StreamPriorityKey { urgency: DEFAULT_URGENCY, incremental: true, id: Default::default(), + sequence: 0, readable: Default::default(), writable: Default::default(), flushable: Default::default(), @@ -847,10 +1047,12 @@ impl Ord for StreamPriorityKey { return cmp::Ordering::Less; } - // ...finally, when both are incremental, `other` takes precedence (so - // `self` is always sorted after other same-urgency incremental - // entries). - cmp::Ordering::Greater + // ...finally, when both are incremental, order by sequence number + // (for round-robin scheduling), then by ID for stability. + match self.sequence.cmp(&other.sequence) { + cmp::Ordering::Equal => self.id.cmp(&other.id), + ord => ord, + } } } @@ -1824,8 +2026,7 @@ mod tests { } fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) { - let key = streams.get(stream_id).unwrap().priority_key.clone(); - streams.update_priority(&key.clone(), &key); + streams.cycle_priority(stream_id); } #[test] @@ -1927,27 +2128,10 @@ mod tests { ]; for (id, urgency) in input.clone() { - // this duplicates some code from stream_priority in order to access - // streams and the collection they're in - let stream = streams + streams .get_or_create(id, &local_tp, &peer_tp, false, true) .unwrap(); - - stream.urgency = urgency; - - let new_priority_key = Arc::new(StreamPriorityKey { - urgency: stream.urgency, - incremental: stream.incremental, - id, - ..Default::default() - }); - - let old_priority_key = std::mem::replace( - &mut stream.priority_key, - new_priority_key.clone(), - ); - - streams.update_priority(&old_priority_key, &new_priority_key); + streams.set_priority(id, urgency, true); } let walk_1: Vec = streams.writable().collect(); @@ -1955,27 +2139,7 @@ mod tests { // Re-applying priority to a stream does not cause duplication. for (id, urgency) in input { - // this duplicates some code from stream_priority in order to access - // streams and the collection they're in - let stream = streams - .get_or_create(id, &local_tp, &peer_tp, false, true) - .unwrap(); - - stream.urgency = urgency; - - let new_priority_key = Arc::new(StreamPriorityKey { - urgency: stream.urgency, - incremental: stream.incremental, - id, - ..Default::default() - }); - - let old_priority_key = std::mem::replace( - &mut stream.priority_key, - new_priority_key.clone(), - ); - - streams.update_priority(&old_priority_key, &new_priority_key); + streams.set_priority(id, urgency, true); } let walk_2: Vec = streams.writable().collect(); @@ -2029,27 +2193,10 @@ mod tests { ]; for (id, urgency) in input.clone() { - // this duplicates some code from stream_priority in order to access - // streams and the collection they're in - let stream = streams + streams .get_or_create(id, &local_tp, &peer_tp, false, true) .unwrap(); - - stream.urgency = urgency; - - let new_priority_key = Arc::new(StreamPriorityKey { - urgency: stream.urgency, - incremental: stream.incremental, - id, - ..Default::default() - }); - - let old_priority_key = std::mem::replace( - &mut stream.priority_key, - new_priority_key.clone(), - ); - - streams.update_priority(&old_priority_key, &new_priority_key); + streams.set_priority(id, urgency, true); } let walk_1: Vec = streams.writable().collect(); @@ -2106,24 +2253,10 @@ mod tests { assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]); // Adding streams doesn't break expected ordering. - let stream = streams + streams .get_or_create(44, &local_tp, &peer_tp, false, true) .unwrap(); - - stream.urgency = 20; - stream.incremental = true; - - let new_priority_key = Arc::new(StreamPriorityKey { - urgency: stream.urgency, - incremental: stream.incremental, - id: 44, - ..Default::default() - }); - - let old_priority_key = - std::mem::replace(&mut stream.priority_key, new_priority_key.clone()); - - streams.update_priority(&old_priority_key, &new_priority_key); + streams.set_priority(44, 20, true); let walk_11: Vec = streams.writable().collect(); assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]); @@ -2166,6 +2299,196 @@ mod tests { prioritized_writable.iter().map(|s| s.id).collect(); assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]); } + + /// Tests that StreamPriorityKey's Ord implementation satisfies the + /// antisymmetry property: if a < b then b > a (and vice versa). + #[test] + fn priority_key_ord_antisymmetry() { + // Test incremental streams with same urgency but different sequences + let a = StreamPriorityKey { + urgency: 100, + incremental: true, + id: 4, + sequence: 1, + ..Default::default() + }; + let b = StreamPriorityKey { + urgency: 100, + incremental: true, + id: 8, + sequence: 2, + ..Default::default() + }; + + // a has lower sequence, so a < b + assert_eq!(a.cmp(&b), cmp::Ordering::Less); + // Antisymmetry: b > a + assert_eq!(b.cmp(&a), cmp::Ordering::Greater); + + // Test with same sequence, should fall back to ID comparison + let c = StreamPriorityKey { + urgency: 100, + incremental: true, + id: 4, + sequence: 1, + ..Default::default() + }; + let d = StreamPriorityKey { + urgency: 100, + incremental: true, + id: 8, + sequence: 1, + ..Default::default() + }; + + assert_eq!(c.cmp(&d), cmp::Ordering::Less); + assert_eq!(d.cmp(&c), cmp::Ordering::Greater); + } + + /// Tests that StreamPriorityKey's Ord implementation satisfies + /// transitivity: if a < b and b < c then a < c. + #[test] + fn priority_key_ord_transitivity() { + let a = StreamPriorityKey { + urgency: 100, + incremental: true, + id: 4, + sequence: 1, + ..Default::default() + }; + let b = StreamPriorityKey { + urgency: 100, + incremental: true, + id: 8, + sequence: 2, + ..Default::default() + }; + let c = StreamPriorityKey { + urgency: 100, + incremental: true, + id: 12, + sequence: 3, + ..Default::default() + }; + + assert_eq!(a.cmp(&b), cmp::Ordering::Less); + assert_eq!(b.cmp(&c), cmp::Ordering::Less); + // Transitivity: a < c + assert_eq!(a.cmp(&c), cmp::Ordering::Less); + } + + /// Tests that StreamPriorityKey's Ord implementation provides a total + /// ordering: for any two elements, exactly one of a < b, a == b, or a > b + /// holds. + #[test] + fn priority_key_ord_totality() { + let keys = vec![ + StreamPriorityKey { + urgency: 50, + incremental: false, + id: 4, + sequence: 1, + ..Default::default() + }, + StreamPriorityKey { + urgency: 50, + incremental: true, + id: 8, + sequence: 2, + ..Default::default() + }, + StreamPriorityKey { + urgency: 100, + incremental: true, + id: 12, + sequence: 3, + ..Default::default() + }, + StreamPriorityKey { + urgency: 100, + incremental: true, + id: 16, + sequence: 3, + ..Default::default() + }, + ]; + + // Every pair must have a defined ordering + for (i, a) in keys.iter().enumerate() { + for (j, b) in keys.iter().enumerate() { + let ord = a.cmp(b); + if i == j { + assert_eq!(ord, cmp::Ordering::Equal); + } else { + assert!(ord != cmp::Ordering::Equal); + // Verify antisymmetry for non-equal pairs + assert_eq!(a.cmp(b).reverse(), b.cmp(a)); + } + } + } + } + + /// Tests that cycle_flushable actually moves incremental streams to the + /// back of the queue for round-robin scheduling. + #[test] + fn round_robin_cycle_flushable() { + let local_tp = crate::TransportParams::default(); + let peer_tp = crate::TransportParams { + initial_max_stream_data_bidi_local: 100, + initial_max_stream_data_uni: 100, + ..Default::default() + }; + + let mut streams = StreamMap::new(100, 100, 100); + + // Create 3 incremental streams with same urgency + for id in [4, 8, 12] { + streams + .get_or_create(id, &local_tp, &peer_tp, false, true) + .unwrap(); + // Write some data to make them flushable + streams + .get_mut(id) + .unwrap() + .send + .write(b"data", false) + .unwrap(); + let priority_key = Arc::clone(&streams.get(id).unwrap().priority_key); + streams.insert_flushable(&priority_key); + } + + // Initial order based on creation sequence + let get_flushable_order = |streams: &StreamMap| -> Vec { + let mut order = Vec::new(); + let mut cursor = streams.flushable.front(); + while let Some(key) = cursor.get() { + order.push(key.id); + cursor.move_next(); + } + order + }; + + let order1 = get_flushable_order(&streams); + assert_eq!(order1, vec![4, 8, 12]); + + // Cycle stream 4 - should move to back + assert!(streams.cycle_flushable(4)); + let order2 = get_flushable_order(&streams); + assert_eq!(order2, vec![8, 12, 4]); + + // Cycle stream 8 - should move to back + assert!(streams.cycle_flushable(8)); + let order3 = get_flushable_order(&streams); + assert_eq!(order3, vec![12, 4, 8]); + + // Cycle stream 12 - should move to back + assert!(streams.cycle_flushable(12)); + let order4 = get_flushable_order(&streams); + assert_eq!(order4, vec![4, 8, 12]); + + // Full cycle returns to original relative order + // (but with new sequence numbers) + } } mod recv_buf; diff --git a/quiche/src/tests.rs b/quiche/src/tests.rs index bb21e1f397..6259ba7d55 100644 --- a/quiche/src/tests.rs +++ b/quiche/src/tests.rs @@ -8251,7 +8251,7 @@ fn last_tx_data_larger_than_tx_data( // Server sends PTO probe (not limited to cwnd), // to update last_tx_data. let (len, _) = pipe.server.send(&mut buf).unwrap(); - assert_eq!(len, 1200); + assert_eq!(len, 848); // Client sends STOP_SENDING to decrease tx_data // by unsent data. It will make last_tx_data > tx_data @@ -10856,3 +10856,83 @@ fn configuration_values_are_limited_to_max_varint() { // do not panic because of too large values that we try to encode via varint. assert_eq!(pipe.handshake(), Err(Error::InvalidTransportParam)); } + +/// Tests that incremental streams are scheduled round-robin when sending. +/// +/// When multiple incremental streams have data queued, each call to send() +/// should emit data from the next stream in rotation, not always from the +/// same stream. +#[test] +fn incremental_stream_round_robin() { + let mut config = Config::new(PROTOCOL_VERSION).unwrap(); + config + .load_cert_chain_from_pem_file("examples/cert.crt") + .unwrap(); + config + .load_priv_key_from_pem_file("examples/cert.key") + .unwrap(); + config + .set_application_protos(&[b"proto1", b"proto2"]) + .unwrap(); + // Set high limits so we can send data on multiple streams + config.set_initial_max_data(100000); + config.set_initial_max_stream_data_bidi_local(10000); + config.set_initial_max_stream_data_bidi_remote(10000); + config.set_initial_max_streams_bidi(10); + config.verify_peer(false); + + let mut pipe = test_utils::Pipe::with_config(&mut config).unwrap(); + assert_eq!(pipe.handshake(), Ok(())); + + // Queue data on three streams. All streams are incremental by default. + // Use 2500 bytes so each stream needs ~2 packets to send all data. + let data = vec![0u8; 2500]; + assert_eq!(pipe.client.stream_send(4, &data, false), Ok(2500)); + assert_eq!(pipe.client.stream_send(8, &data, false), Ok(2500)); + assert_eq!(pipe.client.stream_send(12, &data, false), Ok(2500)); + + // Track which streams get data sent in each packet + let mut send_order = Vec::new(); + + // Send packets one at a time + let mut buf = [0; 1350]; + let mut recv_buf = [0; 1350]; + + // Send multiple packets and track which stream's data is sent + for _ in 0..10 { + let result = pipe.client.send(&mut buf); + if result.is_err() { + break; + } + let (len, _) = result.unwrap(); + if len == 0 { + break; + } + + // Process packet on server + let info = RecvInfo { + to: test_utils::Pipe::server_addr(), + from: test_utils::Pipe::client_addr(), + }; + pipe.server.recv(&mut buf[..len], info).unwrap(); + + // Check which streams received data in THIS packet by consuming data. + // This ensures readable() only shows streams with new data next time. + for stream_id in pipe.server.readable() { + send_order.push(stream_id); + // Drain the data so the stream is no longer readable + while pipe.server.stream_recv(stream_id, &mut recv_buf).is_ok() {} + } + } + + // With round-robin, streams are cycled after sending: + // - Stream 4 sends (lowest sequence), cycled to back -> queue: [8, 12, 4] + // - Stream 8 sends, cycled to back -> queue: [12, 4, 8] + // - Stream 12 sends, cycled to back -> queue: [4, 8, 12] + // Pattern repeats until all data is sent (3 rounds for 2500 bytes each). + assert_eq!( + send_order, + vec![4, 8, 12, 4, 8, 12, 4, 8, 12], + "Expected round-robin ordering: streams interleave across packets" + ); +}