Skip to content

Commit

Permalink
Keep the local fabric idx in subscriptions (project-chip#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivmarkov authored Jun 6, 2024
1 parent e82599c commit c6ae7b1
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 53 deletions.
73 changes: 46 additions & 27 deletions rs-matter/src/data_model/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ const MAX_WRITE_ATTRS_IN_ONE_TRANS: usize = 7;
pub type IMBuffer = heapless::Vec<u8, MAX_EXCHANGE_RX_BUF_SIZE>;

struct SubscriptionBuffer<B> {
node_id: u64,
id: u32,
fabric_idx: u8,
peer_node_id: u64,
subscription_id: u32,
buffer: B,
}

Expand Down Expand Up @@ -367,35 +368,43 @@ where
let req = SubscribeReq::from_tlv(&get_root_node_struct(&rx)?)?;
debug!("IM: Subscribe request: {:?}", req);

let node_id = exchange
.with_session(|sess| sess.get_peer_node_id().ok_or(ErrorCode::Invalid.into()))?;
let (fabric_idx, peer_node_id) = exchange.with_session(|sess| {
let fabric_idx = sess.get_local_fabric_idx().ok_or(ErrorCode::Invalid)?;
let peer_node_id = sess.get_peer_node_id().ok_or(ErrorCode::Invalid)?;

Ok((fabric_idx, peer_node_id))
})?;

if !req.keep_subs {
self.subscriptions.remove(Some(node_id), None);
self.subscriptions
.remove(Some(fabric_idx), Some(peer_node_id), None);
self.subscriptions_buffers
.borrow_mut()
.retain(|sb| sb.node_id != node_id);
.retain(|sb| sb.fabric_idx != fabric_idx || sb.peer_node_id != peer_node_id);

info!("All subscriptions for node {node_id:x} removed");
info!("All subscriptions for [F:{fabric_idx:x},P:{peer_node_id:x}] removed");
}

let max_int_secs = core::cmp::max(req.max_int_ceil, 40); // Say we need at least 4 secs for potential latencies
let min_int_secs = req.min_int_floor;

let Some(id) = self.subscriptions.add(node_id, min_int_secs, max_int_secs) else {
let Some(id) = self
.subscriptions
.add(fabric_idx, peer_node_id, min_int_secs, max_int_secs)
else {
return Self::send_status(exchange, IMStatusCode::ResourceExhausted).await;
};

let subscribed = Cell::new(false);

let _guard = scopeguard::guard((), |_| {
if !subscribed.get() {
self.subscriptions.remove(None, Some(id));
self.subscriptions.remove(None, None, Some(id));
}
});

let primed = self
.report_data(id, node_id, &rx, &mut tx, exchange)
.report_data(id, fabric_idx, peer_node_id, &rx, &mut tx, exchange)
.await?;

if primed {
Expand All @@ -406,15 +415,16 @@ where
})
.await?;

info!("Subscription {node_id:x}::{id} created");
info!("Subscription [F:{fabric_idx:x},P:{peer_node_id:x}]::{id} created");

if self.subscriptions.mark_reported(id) {
let _ = self
.subscriptions_buffers
.borrow_mut()
.push(SubscriptionBuffer {
node_id,
id,
fabric_idx,
peer_node_id,
subscription_id: id,
buffer: rx,
});

Expand All @@ -436,27 +446,33 @@ where
let now = Instant::now();

{
while let Some((node_id, id)) = self.subscriptions.find_expired(now) {
self.subscriptions.remove(None, Some(id));
while let Some((fabric_idx, peer_node_id, id)) =
self.subscriptions.find_expired(now)
{
self.subscriptions.remove(None, None, Some(id));
self.subscriptions_buffers
.borrow_mut()
.retain(|sb| sb.id != id);
.retain(|sb| sb.subscription_id != id);

info!("Subscription {node_id:x}::{id} removed due to inactivity");
info!(
"Subscription [F:{fabric_idx:x},P:{peer_node_id:x}]::{id} removed due to inactivity"
);
}
}

loop {
let sub = self.subscriptions.find_report_due(now);

if let Some((node_id, id)) = sub {
info!("About to report data for subscription {node_id:x}::{id}");
if let Some((fabric_idx, peer_node_id, id)) = sub {
info!(
"About to report data for subscription [F:{fabric_idx:x},P:{peer_node_id:x}]::{id}"
);

let subscribed = Cell::new(false);

let _guard = scopeguard::guard((), |_| {
if !subscribed.get() {
self.subscriptions.remove(None, Some(id));
self.subscriptions.remove(None, None, Some(id));
}
});

Expand All @@ -466,7 +482,7 @@ where
.subscriptions_buffers
.borrow()
.iter()
.position(|sb| sb.id == id)
.position(|sb| sb.subscription_id == id)
.unwrap();
let rx = self.subscriptions_buffers.borrow_mut().remove(index).buffer;

Expand All @@ -475,11 +491,12 @@ where
// Only used when priming the subscription
req.dataver_filters = None;

let mut exchange = Exchange::initiate(matter, node_id, true).await?;
let mut exchange =
Exchange::initiate(matter, fabric_idx, peer_node_id, true).await?;

if let Some(mut tx) = self.buffers.get().await {
let primed = self
.report_data(id, node_id, &rx, &mut tx, &mut exchange)
.report_data(id, fabric_idx, peer_node_id, &rx, &mut tx, &mut exchange)
.await?;

exchange.acknowledge().await?;
Expand All @@ -489,8 +506,9 @@ where
self.subscriptions_buffers
.borrow_mut()
.push(SubscriptionBuffer {
node_id,
id,
fabric_idx,
peer_node_id,
subscription_id: id,
buffer: rx,
});
subscribed.set(true);
Expand Down Expand Up @@ -545,7 +563,8 @@ where
async fn report_data(
&self,
id: u32,
node_id: u64,
fabric_idx: u8,
peer_node_id: u64,
rx: &[u8],
tx: &mut [u8],
exchange: &mut Exchange<'_>,
Expand Down Expand Up @@ -574,7 +593,7 @@ where
exchange.send(OpCode::ReportData, wb.as_slice()).await?;

if !Self::recv_status_success(exchange).await? {
info!("Subscription {node_id:x}::{id} removed during reporting");
info!("Subscription [F:{fabric_idx:x},P:{peer_node_id:x}]::{id} removed during reporting");
return Ok(false);
}

Expand Down
3 changes: 3 additions & 0 deletions rs-matter/src/data_model/sdm/noc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ impl<'a> NocCluster<'a> {
data: &TLVElement,
encoder: CmdDataEncoder,
) -> Result<(), Error> {
// TODO: Need to remove all sessions for this fabric
// TODO: Need to remove all IM subscriptions for this fabric

cmd_enter!("Remove Fabric");
let req = RemoveFabricReq::from_tlv(data).map_err(Error::map_invalid_data_type)?;
if self
Expand Down
39 changes: 27 additions & 12 deletions rs-matter/src/data_model/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use portable_atomic::{AtomicU32, Ordering};
use crate::utils::notification::Notification;

struct Subscription {
node_id: u64,
fabric_idx: u8,
peer_node_id: u64,
id: u32,
// We use u16 instead of embassy::Duration to save some storage
min_int_secs: u16,
Expand Down Expand Up @@ -92,13 +93,20 @@ impl<const N: usize> Subscriptions<N> {
self.notification.notify();
}

pub(crate) fn add(&self, node_id: u64, min_int_secs: u16, max_int_secs: u16) -> Option<u32> {
pub(crate) fn add(
&self,
fabric_idx: u8,
peer_node_id: u64,
min_int_secs: u16,
max_int_secs: u16,
) -> Option<u32> {
let id = self.next_subscription_id.fetch_add(1, Ordering::SeqCst);

self.subscriptions
.borrow_mut()
.push(Subscription {
node_id,
fabric_idx,
peer_node_id,
id,
min_int_secs,
max_int_secs,
Expand Down Expand Up @@ -126,32 +134,39 @@ impl<const N: usize> Subscriptions<N> {
}
}

pub(crate) fn remove(&self, node_id: Option<u64>, id: Option<u32>) {
pub(crate) fn remove(
&self,
fabric_idx: Option<u8>,
peer_node_id: Option<u64>,
id: Option<u32>,
) {
let mut subscriptions = self.subscriptions.borrow_mut();
while let Some(index) = subscriptions.iter().position(|sub| {
sub.node_id == node_id.unwrap_or(sub.node_id) && sub.id == id.unwrap_or(sub.id)
sub.fabric_idx == fabric_idx.unwrap_or(sub.fabric_idx)
&& sub.peer_node_id == peer_node_id.unwrap_or(sub.peer_node_id)
&& sub.id == id.unwrap_or(sub.id)
}) {
subscriptions.swap_remove(index);
}
}

pub(crate) fn find_expired(&self, now: Instant) -> Option<(u64, u32)> {
self.subscriptions
.borrow()
.iter()
.find_map(|sub| sub.is_expired(now).then_some((sub.node_id, sub.id)))
pub(crate) fn find_expired(&self, now: Instant) -> Option<(u8, u64, u32)> {
self.subscriptions.borrow().iter().find_map(|sub| {
sub.is_expired(now)
.then_some((sub.fabric_idx, sub.peer_node_id, sub.id))
})
}

/// Note that this method has a side effect:
/// it updates the `reported_at` field of the subscription that is returned.
pub(crate) fn find_report_due(&self, now: Instant) -> Option<(u64, u32)> {
pub(crate) fn find_report_due(&self, now: Instant) -> Option<(u8, u64, u32)> {
self.subscriptions
.borrow_mut()
.iter_mut()
.find(|sub| sub.report_due(now))
.map(|sub| {
sub.reported_at = now;
(sub.node_id, sub.id)
(sub.fabric_idx, sub.peer_node_id, sub.id)
})
}
}
9 changes: 6 additions & 3 deletions rs-matter/src/transport/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,24 @@ impl<'m> TransportMgr<'m> {
pub(crate) async fn initiate<'a>(
&'a self,
matter: &'a Matter<'a>,
node_id: u64,
fabric_idx: u8,
peer_node_id: u64,
secure: bool,
) -> Result<Exchange<'_>, Error> {
let mut session_mgr = self.session_mgr.borrow_mut();

session_mgr
.get_for_node(node_id, secure)
.get_for_node(fabric_idx, peer_node_id, secure)
.ok_or(ErrorCode::NoSession)?;

let exch_id = session_mgr.get_next_exch_id();

// `unwrap` is safe because we know we have a session or else the early return from above would've triggered
// The reason why we call `get_for_node` twice is to ensure that we don't waste an `exch_id` in case
// we don't have a session in the first place
let session = session_mgr.get_for_node(node_id, secure).unwrap();
let session = session_mgr
.get_for_node(fabric_idx, peer_node_id, secure)
.unwrap();

let exch_index = session
.add_exch(exch_id, Role::Initiator(Default::default()))
Expand Down
16 changes: 11 additions & 5 deletions rs-matter/src/transport/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,18 +764,24 @@ impl<'a> Exchange<'a> {
self.matter
}

/// Create a new initiator exchange on the provided Matter stack for the provided Node ID
/// Create a new initiator exchange on the provided Matter stack for the provided peer Node ID
///
/// This method will fail if there is no existing session in the provided Matter satack for the provided Node ID.
/// This method will fail if there is no existing session in the provided Matter stack for the provided peer Node ID.
///
// TODO: This signature will change in future
// TODO: This signature will change in future, once we are able to do mDNS lookups and thus create a
// new session on our own (currently we can't do it because - in the absence of mDNS lookups - we cannot
// find the IP address and port corresponding to the peer Node ID with which we are trying to initiate an exchange).
#[inline(always)]
pub async fn initiate(
matter: &'a Matter<'a>,
node_id: u64,
fabric_idx: u8,
peer_node_id: u64,
secure: bool,
) -> Result<Self, Error> {
matter.transport_mgr.initiate(matter, node_id, secure).await
matter
.transport_mgr
.initiate(matter, fabric_idx, peer_node_id, secure)
.await
}

/// Accepts a new responder exchange pending on the provided Matter stack.
Expand Down
16 changes: 12 additions & 4 deletions rs-matter/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,11 @@ impl Session {
&self.att_challenge
}

pub(crate) fn is_for_node(&self, node_id: u64, secure: bool) -> bool {
self.peer_nodeid == Some(node_id) && self.is_encrypted() == secure && !self.reserved
pub(crate) fn is_for_node(&self, fabric_idx: u8, peer_node_id: u64, secure: bool) -> bool {
self.get_local_fabric_idx() == Some(fabric_idx)
&& self.peer_nodeid == Some(peer_node_id)
&& self.is_encrypted() == secure
&& !self.reserved
}

pub(crate) fn is_for_rx(&self, rx_peer: &Address, rx_plain: &PlainHdr) -> bool {
Expand Down Expand Up @@ -658,11 +661,16 @@ impl SessionMgr {
session
}

pub(crate) fn get_for_node(&mut self, node_id: u64, secure: bool) -> Option<&mut Session> {
pub(crate) fn get_for_node(
&mut self,
fabric_idx: u8,
peer_node_id: u64,
secure: bool,
) -> Option<&mut Session> {
let mut session = self
.sessions
.iter_mut()
.find(|sess| sess.is_for_node(node_id, secure));
.find(|sess| sess.is_for_node(fabric_idx, peer_node_id, secure));

if let Some(session) = session.as_mut() {
session.update_last_used(self.epoch);
Expand Down
9 changes: 7 additions & 2 deletions rs-matter/tests/common/im_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,13 @@ impl<'a> ImEngine<'a> {
NetworkReceiveImpl(recv_remote),
),
join(responder.respond_once("0"), async move {
let mut exchange =
Exchange::initiate(matter_client, IM_ENGINE_REMOTE_PEER_ID, true).await?;
let mut exchange = Exchange::initiate(
matter_client,
1, /*just one fabric in tests*/
IM_ENGINE_REMOTE_PEER_ID,
true,
)
.await?;

for ip in input {
exchange
Expand Down

0 comments on commit c6ae7b1

Please sign in to comment.