Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions quiche/src/h3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2050,6 +2050,11 @@ impl Connection {

// TODO: check if stream is completed so it can be freed
if let Some(ev) = ev {
// Cycle the stream after returning Headers to enable
// round-robin "skim" behavior across multiple streams.
if matches!(ev.1, Event::Headers { .. }) {
conn.cycle_readable(ev.0);
}
return Ok(ev);
}
}
Expand Down Expand Up @@ -3819,6 +3824,10 @@ mod tests {
s.send_body_client(stream2, true).unwrap();
s.send_body_client(stream1, true).unwrap();

// With reader-side round-robin cycling after Headers events,
// we expect interleaved ordering: all Headers first, then Data/Finished.

// All Headers events first (streams cycle after each Headers)
let (_, ev) = s.poll_server().unwrap();
let ev_headers = Event::Headers {
list: reqs[0].clone(),
Expand All @@ -3840,6 +3849,7 @@ mod tests {
};
assert_eq!(ev, ev_headers);

// Now Data and Finished events for each stream
assert_eq!(s.poll_server(), Ok((0, Event::Data)));
assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_client(), Err(Error::Done));
Expand Down
39 changes: 28 additions & 11 deletions quiche/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,6 @@ use crate::recovery::OnLossDetectionTimeoutOutcome;
use crate::recovery::RecoveryOps;
use crate::recovery::ReleaseDecision;

use crate::stream::StreamPriorityKey;

/// The current QUIC wire version.
pub const PROTOCOL_VERSION: u32 = PROTOCOL_VERSION_V1;

Expand Down Expand Up @@ -4808,14 +4806,14 @@ impl<F: BufFactory> Connection<F> {
}

let priority_key = Arc::clone(&stream.priority_key);

// If the stream is no longer flushable, remove it from the queue
if !stream.is_flushable() {
self.streams.remove_flushable(&priority_key);
} else if stream.incremental {
// Shuffle the incremental stream to the back of the
// queue.
self.streams.remove_flushable(&priority_key);
self.streams.insert_flushable(&priority_key);
// Cycle the incremental stream to the back of the queue
// for round-robin scheduling.
self.streams.cycle_flushable(stream_id);
}

#[cfg(feature = "fuzzing")]
Expand Down Expand Up @@ -5566,15 +5564,21 @@ impl<F: BufFactory> Connection<F> {
stream.urgency = urgency;
stream.incremental = incremental;

let new_priority_key = Arc::new(StreamPriorityKey {
urgency: stream.urgency,
incremental: stream.incremental,
let old_priority_key = Arc::clone(&stream.priority_key);

let sequence = self.streams.next_sequence();

let stream = self.streams.get_mut(stream_id).unwrap();

let new_priority_key = Arc::new(stream::StreamPriorityKey {
urgency,
incremental,
id: stream_id,
sequence,
..Default::default()
});

let old_priority_key =
std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
stream.priority_key = Arc::clone(&new_priority_key);

self.streams
.update_priority(&old_priority_key, &new_priority_key);
Expand Down Expand Up @@ -5767,6 +5771,19 @@ impl<F: BufFactory> Connection<F> {
stream.is_readable()
}

/// Cycles a readable stream to the back of the priority queue.
///
/// This is used for round-robin scheduling of readable streams. After
/// processing data from a stream, calling this method moves it to the
/// back of its priority group, giving other streams a chance to be
/// processed.
///
/// Returns `true` if the stream was successfully cycled, `false` if the
/// stream doesn't exist or isn't in the readable set.
pub(crate) fn cycle_readable(&mut self, stream_id: u64) -> bool {
self.streams.cycle_readable(stream_id)
}

/// Returns the next stream that can be written to.
///
/// Note that once returned by this method, a stream ID will not be returned
Expand Down
Loading
Loading