Skip to content

Commit

Permalink
refactor(s2n-quic-transport): update PTO timer once per transmission …
Browse files Browse the repository at this point in the history
…burst (#1884)

* refactor(s2n-quic-transport): update PTO timer once per transmission burst

* remove congestion_controlled check from ack_eliciting_packets_in_flight

* fix formatting

* Add debug assert

* correct comment
  • Loading branch information
WesleyRosenblum authored Jul 26, 2023
1 parent 8460c72 commit 2ff55c7
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 58 deletions.
7 changes: 7 additions & 0 deletions quic/s2n-quic-transport/src/connection/connection_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,13 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
packet_interceptor,
);

// If anything was transmitted, notify the space manager
// that a burst of packets has completed transmission
if count > 0 {
self.space_manager
.on_transmit_burst_complete(self.path_manager.active_path(), timestamp);
}

let mut publisher = self.event_context.publisher(timestamp, subscriber);
if outcome.bytes_progressed > 0 {
publisher.on_tx_stream_progress(TxStreamProgress {
Expand Down
42 changes: 32 additions & 10 deletions quic/s2n-quic-transport/src/recovery/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ pub struct Manager<Config: endpoint::Config> {

// The total ecn counts for outstanding (unacknowledged) packets
sent_packet_ecn_counts: EcnCounts,

// An update to the PTO timer is needed.
//
// Used for updating the PTO timer at the end of a transmission burst.
pto_update_pending: bool,
}

//= https://www.rfc-editor.org/rfc/rfc9002#section-6.1.1
Expand Down Expand Up @@ -124,6 +129,7 @@ impl<Config: endpoint::Config> Manager<Config> {
time_of_last_ack_eliciting_packet: None,
baseline_ecn_counts: EcnCounts::default(),
sent_packet_ecn_counts: EcnCounts::default(),
pto_update_pending: false,
}
}

Expand Down Expand Up @@ -161,6 +167,8 @@ impl<Config: endpoint::Config> Manager<Config> {
context: &mut Ctx,
publisher: &mut Pub,
) {
debug_assert!(!self.pto_update_pending);

if self.loss_timer.is_armed() {
if self.loss_timer.poll_expiration(timestamp).is_ready() {
self.detect_and_remove_lost_packets(
Expand Down Expand Up @@ -251,16 +259,27 @@ impl<Config: endpoint::Config> Manager<Config> {
.on_packet_sent(ecn, path_event!(path, path_id), publisher);
self.sent_packet_ecn_counts.increment(ecn);

if outcome.is_congestion_controlled {
if outcome.ack_elicitation.is_ack_eliciting() {
self.time_of_last_ack_eliciting_packet = Some(time_sent);
}
if outcome.ack_elicitation.is_ack_eliciting() {
self.time_of_last_ack_eliciting_packet = Some(time_sent);
//= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1
//# A sender SHOULD restart its PTO timer every time an ack-eliciting
//# packet is sent or acknowledged,
let is_handshake_confirmed = context.is_handshake_confirmed();
let path = context.path_mut_by_id(context.path_id());
self.update_pto_timer(path, time_sent, is_handshake_confirmed);
self.pto_update_pending = true;
}
}

/// Invoked after a burst of packets has completed transmitting
pub fn on_transmit_burst_complete(
&mut self,
active_path: &Path<Config>,
now: Timestamp,
is_handshake_confirmed: bool,
) {
debug_assert!(active_path.is_active());
if self.pto_update_pending {
// Update the PTO timer once per transmission burst to reduce CPU cost
self.update_pto_timer(active_path, now, is_handshake_confirmed);
debug_assert!(!self.pto_update_pending);
}
}

Expand All @@ -271,6 +290,8 @@ impl<Config: endpoint::Config> Manager<Config> {
now: Timestamp,
is_handshake_confirmed: bool,
) {
self.pto_update_pending = false;

//= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2.1
//# If no additional data can be sent, the server's PTO timer MUST NOT be
//# armed until datagrams have been received from the client, because
Expand All @@ -281,9 +302,10 @@ impl<Config: endpoint::Config> Manager<Config> {
return;
}

let ack_eliciting_packets_in_flight = self.sent_packets.iter().any(|(_, sent_info)| {
sent_info.congestion_controlled && sent_info.ack_elicitation.is_ack_eliciting()
});
let ack_eliciting_packets_in_flight = self
.sent_packets
.iter()
.any(|(_, sent_info)| sent_info.ack_elicitation.is_ack_eliciting());

//= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2.1
//# it is the client's responsibility to send packets to unblock the server
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
source: quic/s2n-quic-transport/src/recovery/manager/tests.rs
expression: ""
---

122 changes: 74 additions & 48 deletions quic/s2n-quic-transport/src/recovery/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ fn one_second_pto_when_no_previous_rtt_available() {
//= type=test
#[test]
fn on_packet_sent() {
let max_ack_delay = Duration::from_millis(100);
let now = time::now();
let mut time_sent = now;
let ecn = ExplicitCongestionNotification::Ect0;
Expand All @@ -88,14 +87,11 @@ fn on_packet_sent() {
// simulate receiving a handshake packet to force path validation
context.path_mut().on_handshake_packet();

// PTO = smoothed_rtt + max(4*rttvar, kGranularity) + max_ack_delay
// PTO = DEFAULT_INITIAL_RTT + 4*DEFAULT_INITIAL_RTT/2 + 10
let expected_pto_duration = DEFAULT_INITIAL_RTT + 2 * DEFAULT_INITIAL_RTT + max_ack_delay;
let mut expected_bytes_in_flight = 0;

for i in 1..=10 {
// Reset the timer so we can confirm it was set correctly
manager.pto.timer.cancel();
// Reset pto_update_pending so we can confirm it was set correctly
manager.pto_update_pending = false;

let sent_packet = space.new_packet_number(VarInt::from_u8(i));
let ack_elicitation = if i % 2 == 0 {
Expand Down Expand Up @@ -138,36 +134,23 @@ fn on_packet_sent() {

if outcome.is_congestion_controlled {
assert_eq!(actual_sent_packet.sent_bytes as usize, outcome.bytes_sent);

if outcome.ack_elicitation.is_ack_eliciting() {
//= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1
//= type=test
//# A sender SHOULD restart its PTO timer every time an ack-eliciting
//# packet is sent
assert!(manager.pto.timer.is_armed());
assert_eq!(
Some(time_sent + expected_pto_duration),
manager.pto.timer.next_expiration()
);
} else if let Some(time_of_last_ack_eliciting_packet) =
manager.time_of_last_ack_eliciting_packet
{
assert!(manager.pto.timer.is_armed());
assert_eq!(
Some(time_of_last_ack_eliciting_packet + expected_pto_duration),
manager.pto.timer.next_expiration()
);
} else {
// No ack eliciting packets have been sent yet
assert!(!manager.pto.timer.is_armed());
assert_eq!(None, manager.pto.timer.next_expiration());
}

expected_bytes_in_flight += outcome.bytes_sent;
} else {
assert_eq!(actual_sent_packet.sent_bytes, 0);
}

if outcome.ack_elicitation.is_ack_eliciting() {
assert_eq!(Some(time_sent), manager.time_of_last_ack_eliciting_packet);
//= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1
//= type=test
//# A sender SHOULD restart its PTO timer every time an ack-eliciting
//# packet is sent
assert!(manager.pto_update_pending);
} else {
// No ack eliciting packets have been sent yet
assert!(!manager.pto_update_pending);
}

time_sent += Duration::from_millis(10);
}

Expand Down Expand Up @@ -212,9 +195,6 @@ fn on_packet_sent() {
// - bytes sent, congestion_controlled, time_sent match that of sent
// - pto is armed
fn on_packet_sent_across_multiple_paths() {
// let space = PacketNumberSpace::ApplicationData;
let max_ack_delay = Duration::from_millis(100);
// let mut manager = Manager::new(space, max_ack_delay);
let now = time::now();
let ecn = ExplicitCongestionNotification::default();
let mut time_sent = now;
Expand All @@ -231,12 +211,8 @@ fn on_packet_sent_across_multiple_paths() {
// simulate receiving a handshake packet to force path validation
context.path_mut().on_handshake_packet();

// PTO = smoothed_rtt + max(4*rttvar, kGranularity) + max_ack_delay
// PTO = DEFAULT_INITIAL_RTT + 4*DEFAULT_INITIAL_RTT/2 + 10
let expected_pto_duration = DEFAULT_INITIAL_RTT + 2 * DEFAULT_INITIAL_RTT + max_ack_delay;

// Reset the timer so we can confirm it was set correctly
manager.pto.timer.cancel();
// Reset pto_update_pending so we can confirm it was set correctly
manager.pto_update_pending = false;

// Trigger 1:
let sent_packet = space.new_packet_number(VarInt::from_u8(1));
Expand Down Expand Up @@ -276,9 +252,7 @@ fn on_packet_sent_across_multiple_paths() {
//= type=test
//# A sender SHOULD restart its PTO timer every time an ack-eliciting
//# packet is sent
let expected_pto = time_sent + expected_pto_duration;
assert!(manager.pto.timer.is_armed());
assert_eq!(Some(expected_pto), manager.pto.timer.next_expiration());
assert!(manager.pto_update_pending);

// Setup 2:
// send 2nd packet on path 2nd path
Expand All @@ -291,8 +265,8 @@ fn on_packet_sent_across_multiple_paths() {
bytes_progressed: 0,
};

// Reset the timer so we can confirm it was set correctly
manager.pto.timer.cancel();
// Reset pto_update_pending so we can confirm it was set correctly
manager.pto_update_pending = false;

// Trigger 2:
context.set_path_id(second_path_id);
Expand Down Expand Up @@ -323,9 +297,7 @@ fn on_packet_sent_across_multiple_paths() {
//= type=test
//# A sender SHOULD restart its PTO timer every time an ack-eliciting
//# packet is sent
let expected_pto = time_sent + expected_pto_duration;
assert!(manager.pto.timer.is_armed());
assert_eq!(Some(expected_pto), manager.pto.timer.next_expiration());
assert!(manager.pto_update_pending);
}

//= https://www.rfc-editor.org/rfc/rfc9002#appendix-A.7
Expand Down Expand Up @@ -2477,6 +2449,7 @@ fn update_pto_timer() {
context.path_mut().on_bytes_transmitted((1200 * 3) + 1);
// Arm the PTO so we can verify it is cancelled
manager.pto.timer.set(now + Duration::from_secs(10));
manager.pto_update_pending = true;
manager.update_pto_timer(context.path(), now, is_handshake_confirmed);

//= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2.1
Expand All @@ -2485,9 +2458,11 @@ fn update_pto_timer() {
//# armed until datagrams have been received from the client, because
//# packets sent on PTO count against the anti-amplification limit.
assert!(!manager.pto.timer.is_armed());
assert!(!manager.pto_update_pending);

// Arm the PTO so we can verify it is cancelled
manager.pto.timer.set(now + Duration::from_secs(10));
manager.pto_update_pending = true;
// Validate the path so it is not at the anti-amplification limit
//
// simulate receiving a handshake packet to force path validation
Expand All @@ -2497,6 +2472,7 @@ fn update_pto_timer() {

// Since the path is peer validated and sent packets is empty, PTO is cancelled
assert!(!manager.pto.timer.is_armed());
assert!(!manager.pto_update_pending);

// Reset the path back to not peer validated
context.path_manager[unsafe { path::Id::new(0) }] = Path::new(
Expand All @@ -2511,6 +2487,7 @@ fn update_pto_timer() {
// simulate receiving a handshake packet to force path validation
context.path_mut().on_handshake_packet();
context.path_mut().pto_backoff = 2;
manager.pto_update_pending = true;
let is_handshake_confirmed = false;
manager.update_pto_timer(context.path(), now, is_handshake_confirmed);

Expand All @@ -2519,13 +2496,16 @@ fn update_pto_timer() {
//# An endpoint MUST NOT set its PTO timer for the Application Data
//# packet number space until the handshake is confirmed.
assert!(!manager.pto.timer.is_armed());
assert!(!manager.pto_update_pending);

// Set is handshake confirmed back to true
let is_handshake_confirmed = true;
manager.pto_update_pending = true;
manager.update_pto_timer(context.path(), now, is_handshake_confirmed);

// Now the PTO is armed
assert!(manager.pto.timer.is_armed());
assert!(!manager.pto_update_pending);

// Send a packet to validate behavior when sent_packets is not empty
manager.on_packet_sent(
Expand Down Expand Up @@ -2554,6 +2534,7 @@ fn update_pto_timer() {
true,
space,
);
manager.pto_update_pending = true;
manager.update_pto_timer(context.path(), now, is_handshake_confirmed);

//= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1
Expand All @@ -2568,6 +2549,7 @@ fn update_pto_timer() {
manager.pto.timer.next_expiration().unwrap(),
expected_pto_base_timestamp + Duration::from_millis(12020)
);
assert!(!manager.pto_update_pending);
}

//= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2.1
Expand Down Expand Up @@ -2682,6 +2664,7 @@ fn on_timeout() {
&mut context,
&mut publisher,
);
manager.on_transmit_burst_complete(context.path(), now, true);

// Loss timer is armed and expired, on_packet_loss is called
manager.loss_timer.set(now - Duration::from_secs(1));
Expand Down Expand Up @@ -3105,6 +3088,49 @@ fn packet_declared_lost_less_than_1_ms_from_loss_threshold() {
assert_eq!(1, context.on_packet_loss_count);
}

#[test]
fn on_transmit_burst_complete() {
let space = PacketNumberSpace::ApplicationData;
let mut manager = Manager::new(space);
let now = time::now() + Duration::from_secs(10);
let is_handshake_confirmed = true;
let mut path_manager = helper_generate_path_manager(Duration::from_millis(10));
let ecn = ExplicitCongestionNotification::default();
let mut context = MockContext::new(&mut path_manager);
let mut publisher = Publisher::snapshot();

// Send an ack-eliciting packet to trigger a PTO timer update
manager.on_packet_sent(
space.new_packet_number(VarInt::from_u8(1)),
transmission::Outcome {
ack_elicitation: AckElicitation::Eliciting,
is_congestion_controlled: true,
bytes_sent: 1,
bytes_progressed: 0,
},
now,
ecn,
transmission::Mode::Normal,
None,
&mut context,
&mut publisher,
);

// Validate the path so the PTO timer can be set
context.path_mut().on_handshake_packet();
context.path_mut().on_peer_validated();

assert!(manager.pto_update_pending);
manager.on_transmit_burst_complete(path_manager.active_path(), now, is_handshake_confirmed);
assert!(manager.pto.timer.is_armed());
assert!(!manager.pto_update_pending);

// Cancel the PTO timer to validate it isn't re-armed when not needed
manager.pto.timer.cancel();
manager.on_transmit_burst_complete(path_manager.active_path(), now, is_handshake_confirmed);
assert!(!manager.pto.timer.is_armed());
}

fn helper_generate_multi_path_manager(
space: PacketNumberSpace,
publisher: &mut Publisher,
Expand Down
13 changes: 13 additions & 0 deletions quic/s2n-quic-transport/src/space/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,19 @@ impl<Config: endpoint::Config> ApplicationSpace<Config> {
Ok((outcome, buffer))
}

pub(super) fn on_transmit_burst_complete(
&mut self,
active_path: &Path<Config>,
timestamp: Timestamp,
is_handshake_confirmed: bool,
) {
self.recovery_manager.on_transmit_burst_complete(
active_path,
timestamp,
is_handshake_confirmed,
);
}

pub(super) fn on_transmit_close<'a>(
&mut self,
context: &mut ConnectionTransmissionContext<Config>,
Expand Down
Loading

0 comments on commit 2ff55c7

Please sign in to comment.