Skip to content

Commit

Permalink
fix: congestion pto timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
metah3m committed Jan 21, 2025
1 parent 65b2b39 commit c0155e9
Showing 1 changed file with 31 additions and 48 deletions.
79 changes: 31 additions & 48 deletions qcongestion/src/congestion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,10 @@ impl CongestionController {
if let Some(last_pn) = self.sent_packets[space].back() {
assert!(pn > last_pn.pn);
}
self.sent_packets[space].push_back(sent);

if ack_eliciting {
self.sent_packets[space].push_back(sent);
}
self.pacer.on_sent(sent_bytes as u64);
}

Expand Down Expand Up @@ -275,6 +278,7 @@ impl CongestionController {
// Client sends an anti-deadlock packet: Initial is padded
// to earn more anti-amplification credit,
// a Handshake packet proves address ownership.
debug!("Anti-deadlock packet sent");
if self.handshake.is_getting_keys() {
Epoch::Handshake
} else {
Expand All @@ -289,9 +293,11 @@ impl CongestionController {

self.pto_count += 1;
log::debug!(
"PTO timeout, epoch: {:?}, pto_count: {}",
"{:?} PTO timeout, epoch: {:?}, pto_count: {} inflight: {:?}",
self.handshake.role(),
pto_epoch,
self.pto_count
self.pto_count,
self.sent_packets[pto_epoch].iter().map(|pkt| pkt.pn).collect::<Vec<_>>()
);
// Retransmit frames from the oldest sent packet. However,
// these packets are not actually declared lost, so have no effect on
Expand Down Expand Up @@ -338,7 +344,7 @@ impl CongestionController {

let mut pto_time = None;
for &space in Epoch::iter() {
if self.time_of_last_ack_eliciting_packet[space].is_none() {
if self.sent_packets[space].is_empty() {
continue;
}
if space == Epoch::Data {
Expand Down Expand Up @@ -407,21 +413,13 @@ impl CongestionController {
}

fn slide_sent_packets(&mut self, space: Epoch) {
while let Some(sent) = self.sent_packets[space].front() {
if !sent.is_acked {
break;
}
while self.sent_packets[space].front().map_or(false, |sent| sent.is_acked) {
self.sent_packets[space].pop_front();
}
}

fn no_ack_eliciting_in_flight(&self) -> bool {
for space in Epoch::iter() {
if self.time_of_last_ack_eliciting_packet[*space].is_some() {
return false;
}
}
true
Epoch::iter().all(|space| self.sent_packets[*space].is_empty())
}

fn server_completed_address_validation(&mut self) -> bool {
Expand All @@ -441,6 +439,20 @@ impl CongestionController {
fn send_quota(&mut self, now: Instant) -> usize {
self.pacer.schedule(self.rtt.smoothed_rtt(), self.algorithm.cwnd(), MSS, now, self.algorithm.pacing_rate())
}

#[inline]
fn send_elapsed(&self, now: Instant) -> Duration{
now.saturating_duration_since(self.last_sent_time)
}

#[inline]
fn ready_to_send(&mut self, now: Instant) -> Option<usize>{
let token = self.send_quota(now);
if token >= MSS || self.requires_ack() || self.send_elapsed(now) >= MAX_SENT_DELAY{
return Some(token);
}
None
}
}

/// Shared congestion controller
Expand Down Expand Up @@ -475,53 +487,24 @@ impl super::CongestionControl for ArcCC {
if guard.loss_timer.is_timeout(now) {
guard.on_loss_timeout(now);
}
let requires_ack = guard.requires_ack();
// Can send some data
if guard.send_quota(now) > MSS || requires_ack {
if guard.ready_to_send(now).is_some() {
if let Some(waker) = guard.send_waker.take() {
waker.wake();
}
}
// require send ack
if requires_ack {
if guard.requires_ack() {
notify.notify_waiters();
}
}
}).abort_handle()
}
fn poll_send(&self, cx: &mut Context<'_>) -> Poll<usize> {
let mut guard = self.0.lock().unwrap();
guard.send_waker = Some(cx.waker().clone());
let now = Instant::now();
if guard.loss_timer.is_timeout(now) {
guard.on_loss_timeout(now);
}

let srtt = guard.rtt.smoothed_rtt();
let cwnd = guard.algorithm.cwnd();
let mtu = MSS;
let rate = guard.algorithm.pacing_rate();
let tokens = guard.pacer.schedule(srtt, cwnd, mtu, now, rate);
if tokens >= mtu {
return Poll::Ready(tokens);
}

let mut need_ack = false;
for &epoch in Epoch::iter() {
if guard.rcvd_records[epoch]
.requires_ack(guard.max_ack_delay)
.is_some()
{
need_ack = true;
break;
}
}
// 1. 有 ack 要发送, 且距离上次发送时间大于 max ack dely
// 2. 距离上次发送时间大于 max sent delay
let elapsed = now.saturating_duration_since(guard.last_sent_time);
if (need_ack && elapsed >= guard.max_ack_delay) || elapsed >= MAX_SENT_DELAY {
if let Some(tokens) = guard.ready_to_send(now) {
return Poll::Ready(tokens);
}
guard.send_waker = Some(cx.waker().clone());
Poll::Pending
}

Expand Down Expand Up @@ -666,7 +649,7 @@ impl RcvdRecords {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct AckedPkt {
pub pn: u64,
pub time_sent: Instant,
Expand Down

0 comments on commit c0155e9

Please sign in to comment.