diff --git a/rs-matter/src/error.rs b/rs-matter/src/error.rs index 6a16caf4..4e373cc7 100644 --- a/rs-matter/src/error.rs +++ b/rs-matter/src/error.rs @@ -49,8 +49,8 @@ pub enum ErrorCode { NoSpace, NoSpaceExchanges, NoSpaceSessions, - NoSpaceAckTable, - NoSpaceRetransTable, + TxTimeout, + RxTimeout, NoTagFound, NotFound, PacketPoolExhaust, diff --git a/rs-matter/src/transport/exchange.rs b/rs-matter/src/transport/exchange.rs index 76e8fcb0..9cd3b9c6 100644 --- a/rs-matter/src/transport/exchange.rs +++ b/rs-matter/src/transport/exchange.rs @@ -20,7 +20,7 @@ use core::fmt::{self, Display}; use core::pin::pin; use embassy_futures::select::{select, Either}; -use embassy_time::Timer; +use embassy_time::{Duration, Timer}; use log::{debug, error, info, warn}; @@ -78,25 +78,32 @@ impl ExchangeId { let transport_mgr = &matter.transport_mgr; - let mut packet = transport_mgr - .get_if(&transport_mgr.rx, |packet| { - if packet.buf.is_empty() { - false - } else { - let for_us = self.with_ctx(matter, |sess, exch_index| { - if sess.is_for_rx(&packet.peer, &packet.header.plain) { - let exchange = sess.exchanges[exch_index].as_ref().unwrap(); + let mut recv = pin!(transport_mgr.get_if(&transport_mgr.rx, |packet| { + if packet.buf.is_empty() { + false + } else { + let for_us = self.with_ctx(matter, |sess, exch_index| { + if sess.is_for_rx(&packet.peer, &packet.header.plain) { + let exchange = sess.exchanges[exch_index].as_ref().unwrap(); - return Ok(exchange.is_for_rx(&packet.header.proto)); - } + return Ok(exchange.is_for_rx(&packet.header.proto)); + } - Ok(false) - }); + Ok(false) + }); - for_us.unwrap_or(true) - } - }) - .await; + for_us.unwrap_or(true) + } + })); + + let mut timeout = pin!(Timer::after(Duration::from_millis( + RetransEntry::max_delay_ms() * 3 / 2 + ))); + + let Either::First(mut packet) = select(&mut recv, &mut timeout).await else { + // Timeout waiting for an answer from the other peer + return Err(ErrorCode::RxTimeout.into()); + }; packet.clear_on_drop(true); diff --git a/rs-matter/src/transport/mrp.rs b/rs-matter/src/transport/mrp.rs index 693f60be..ff3af46b 100644 --- a/rs-matter/src/transport/mrp.rs +++ b/rs-matter/src/transport/mrp.rs @@ -58,11 +58,24 @@ impl RetransEntry { .unwrap_or(true) } + /// Return how much to delay before (re)transmitting the message + /// based on the number of re-transmissions so far pub fn delay_ms(&self) -> u64 { + Self::delay_ms_counter(self.counter) + } + + /// Maximum delay before giving up on retransmitting the message + pub fn max_delay_ms() -> u64 { + Self::delay_ms_counter(MRP_MAX_TRANSMISSIONS) + } + + /// Return how much to delay before (re)transmitting the message + /// based on the provided number of re-transmissions so far + pub fn delay_ms_counter(counter: usize) -> u64 { let mut delay = MRP_BASE_RETRY_INTERVAL_MS; - if self.counter >= MRP_BACKOFF_THRESHOLD { - for _ in 0..self.counter - MRP_BACKOFF_THRESHOLD { + if counter >= MRP_BACKOFF_THRESHOLD { + for _ in 0..counter - MRP_BACKOFF_THRESHOLD { delay = delay * MRP_BACKOFF_BASE.0 / MRP_BACKOFF_BASE.1; } } @@ -76,7 +89,7 @@ impl RetransEntry { self.counter += 1; Ok(()) } else { - Err(ErrorCode::Invalid.into()) // TODO + Err(ErrorCode::TxTimeout.into()) } } else { // This indicates there was some existing entry for same sess-id/exch-id, which shouldn't happen