Skip to content

Commit

Permalink
Merge pull request project-chip#179 from ivmarkov/timeout-recv
Browse files Browse the repository at this point in the history
Timeout exchange recv
  • Loading branch information
kedars authored Jun 5, 2024
2 parents 723179c + 023d7ba commit 69e6ee7
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 22 deletions.
4 changes: 2 additions & 2 deletions rs-matter/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ pub enum ErrorCode {
NoSpace,
NoSpaceExchanges,
NoSpaceSessions,
NoSpaceAckTable,
NoSpaceRetransTable,
TxTimeout,
RxTimeout,
NoTagFound,
NotFound,
PacketPoolExhaust,
Expand Down
41 changes: 24 additions & 17 deletions rs-matter/src/transport/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use core::fmt::{self, Display};
use core::pin::pin;

use embassy_futures::select::{select, Either};
use embassy_time::Timer;
use embassy_time::{Duration, Timer};

use log::{debug, error, info, warn};

Expand Down Expand Up @@ -81,25 +81,32 @@ impl ExchangeId {

let transport_mgr = &matter.transport_mgr;

let mut packet = transport_mgr
.get_if(&transport_mgr.rx, |packet| {
if packet.buf.is_empty() {
false
} else {
let for_us = self.with_ctx(matter, |sess, exch_index| {
if sess.is_for_rx(&packet.peer, &packet.header.plain) {
let exchange = sess.exchanges[exch_index].as_ref().unwrap();
let mut recv = pin!(transport_mgr.get_if(&transport_mgr.rx, |packet| {
if packet.buf.is_empty() {
false
} else {
let for_us = self.with_ctx(matter, |sess, exch_index| {
if sess.is_for_rx(&packet.peer, &packet.header.plain) {
let exchange = sess.exchanges[exch_index].as_ref().unwrap();

return Ok(exchange.is_for_rx(&packet.header.proto));
}
return Ok(exchange.is_for_rx(&packet.header.proto));
}

Ok(false)
});
Ok(false)
});

for_us.unwrap_or(true)
}
})
.await;
for_us.unwrap_or(true)
}
}));

let mut timeout = pin!(Timer::after(Duration::from_millis(
RetransEntry::max_delay_ms() * 3 / 2
)));

let Either::First(mut packet) = select(&mut recv, &mut timeout).await else {
// Timeout waiting for an answer from the other peer
return Err(ErrorCode::RxTimeout.into());
};

packet.clear_on_drop(true);

Expand Down
19 changes: 16 additions & 3 deletions rs-matter/src/transport/mrp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,24 @@ impl RetransEntry {
.unwrap_or(true)
}

/// Return how much to delay before (re)transmitting the message
/// based on the number of re-transmissions so far
pub fn delay_ms(&self) -> u64 {
Self::delay_ms_counter(self.counter)
}

/// Maximum delay before giving up on retransmitting the message
pub fn max_delay_ms() -> u64 {
Self::delay_ms_counter(MRP_MAX_TRANSMISSIONS)
}

/// Return how much to delay before (re)transmitting the message
/// based on the provided number of re-transmissions so far
pub fn delay_ms_counter(counter: usize) -> u64 {
let mut delay = MRP_BASE_RETRY_INTERVAL_MS;

if self.counter >= MRP_BACKOFF_THRESHOLD {
for _ in 0..self.counter - MRP_BACKOFF_THRESHOLD {
if counter >= MRP_BACKOFF_THRESHOLD {
for _ in 0..counter - MRP_BACKOFF_THRESHOLD {
delay = delay * MRP_BACKOFF_BASE.0 / MRP_BACKOFF_BASE.1;
}
}
Expand All @@ -76,7 +89,7 @@ impl RetransEntry {
self.counter += 1;
Ok(())
} else {
Err(ErrorCode::Invalid.into()) // TODO
Err(ErrorCode::TxTimeout.into())
}
} else {
// This indicates there was some existing entry for same sess-id/exch-id, which shouldn't happen
Expand Down

0 comments on commit 69e6ee7

Please sign in to comment.