Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(s2n-quic-core): always wake application on available datagram capacity #2249

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 92 additions & 25 deletions quic/s2n-quic-core/src/datagram/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ pub struct Sender {
capacity: usize,
min_packet_space: usize,
max_packet_space: usize,
dropped_datagrams: u64,
smoothed_packet_size: f64,
waker: Option<Waker>,
max_datagram_payload: u64,
Expand Down Expand Up @@ -449,43 +450,64 @@ impl Sender {
pub fn smoothed_packet_space(&self) -> usize {
self.smoothed_packet_size as usize
}

/// Returns the number of datagrams that have been dropped by the sender
///
/// The cause of drops is due to the datagrams being larger than the current path MTU. If this
/// number is non-zero, applications should try to send smaller datagrams.
#[inline]
pub fn dropped_datagrams(&self) -> u64 {
self.dropped_datagrams
}
}

impl super::Sender for Sender {
#[inline]
fn on_transmit<P: Packet>(&mut self, packet: &mut P) {
// Cede space to stream data when datagrams are not prioritized
if packet.has_pending_streams() && !packet.datagrams_prioritized() {
return;
}

self.record_capacity_stats(packet.remaining_capacity());

let mut has_written = false;

while packet.remaining_capacity() > 0 {
if let Some(datagram) = self.queue.pop_front() {
// Ensure there is enough space in the packet to send a datagram
if packet.remaining_capacity() >= datagram.data.len() {
match packet.write_datagram(&datagram.data) {
Ok(()) => has_written = true,
Err(_error) => {
continue;
}
}
// Since a datagram was popped off the queue, wake the
// stored waker if we have one to let the application know
// that there is space on the queue for more datagrams.
if let Some(w) = self.waker.take() {
w.wake();
}
} else {
// This check keeps us from popping all the datagrams off the
// queue when packet space remaining is smaller than the datagram.
if has_written {
self.queue.push_front(datagram);
return;
}
let Some(datagram) = self.queue.pop_front() else {
break;
};

// Ensure there is enough space in the packet to send a datagram
if packet.remaining_capacity() < datagram.data.len() {
// This check keeps us from popping all the datagrams off the
// queue when packet space remaining is smaller than the datagram.
if has_written {
self.queue.push_front(datagram);
break;
}

// the datagram is too large for the current packet and unlikely to ever fit so
// record a metric and try the next datagram in the queue
self.dropped_datagrams += 1;
continue;
}

match packet.write_datagram(&datagram.data) {
Ok(()) => has_written = true,
Err(_error) => {
// TODO log this
self.dropped_datagrams += 1;
continue;
}
} else {
// If there are no datagrams on the queue we return
return;
}
}

// If we now have additional capacity wake the stored waker if we have one to
// let the application know that there is space on the queue for more datagrams.
if self.capacity > self.queue.len() {
if let Some(w) = self.waker.take() {
w.wake();
}
}
}
Expand Down Expand Up @@ -540,6 +562,7 @@ impl SenderBuilder {
queue: VecDeque::with_capacity(self.queue_capacity),
capacity: self.queue_capacity,
max_datagram_payload: self.max_datagram_payload,
dropped_datagrams: 0,
max_packet_space: 0,
min_packet_space: 0,
smoothed_packet_size: 0.0,
Expand Down Expand Up @@ -780,6 +803,50 @@ mod tests {
assert!(!default_sender.queue.is_empty());
}

/// Ensures the application waker is called when capacity becomes available
#[test]
fn wake_with_capacity() {
let (waker, wake_count) = new_count_waker();
let mut cx = Context::from_waker(&waker);

let conn_info = ConnectionInfo::new(100, waker.clone());

let mut default_sender = Sender::builder()
.with_capacity(1)
.with_connection_info(&conn_info)
.build()
.unwrap();

let datagram = bytes::Bytes::from_static(&[1, 2, 3]);

assert!(default_sender
.poll_send_datagram(&mut datagram.clone(), &mut cx)
.is_ready());
assert!(default_sender
.poll_send_datagram(&mut datagram.clone(), &mut cx)
.is_pending());

assert_eq!(wake_count.get(), 0);

// Packet size is just enough to write the first datagram with some
// room left over, but not enough to write the second.
let mut packet = MockPacket {
remaining_capacity: 2,
has_pending_streams: false,
datagrams_prioritized: false,
};
crate::datagram::Sender::on_transmit(&mut default_sender, &mut packet);

// Packet capacity has not changed
assert_eq!(packet.remaining_capacity, 2);
// Send queue is completely depleted
assert!(default_sender.queue.is_empty());
// The waker was called since we now have capacity
assert_eq!(wake_count.get(), 1);
// The sender should record the number of dropped datagrams
assert_eq!(default_sender.dropped_datagrams(), 1);
}

fn fake_receive_context() -> crate::datagram::ReceiveContext<'static> {
crate::datagram::ReceiveContext {
path: crate::event::api::Path {
Expand Down
Loading