diff --git a/Cargo.lock b/Cargo.lock index fc01375..9431955 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4053,8 +4053,8 @@ dependencies = [ [[package]] name = "zenoh" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "async-global-executor", "async-std", @@ -4117,16 +4117,16 @@ dependencies = [ [[package]] name = "zenoh-buffers" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "zenoh-collections", ] [[package]] name = "zenoh-codec" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "log", "serde", @@ -4137,13 +4137,13 @@ dependencies = [ [[package]] name = "zenoh-collections" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" [[package]] name = "zenoh-config" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "flume", "json5", @@ -4160,8 +4160,8 @@ dependencies = [ [[package]] name = "zenoh-core" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "async-std", "lazy_static", @@ -4170,8 +4170,8 @@ dependencies = [ [[package]] name = "zenoh-crypto" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "aes 0.8.3", "hmac 0.12.1", @@ -4183,8 +4183,8 @@ dependencies = [ [[package]] name = "zenoh-ext" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "async-std", "bincode", @@ -4203,8 +4203,8 @@ dependencies = [ [[package]] name = "zenoh-keyexpr" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "hashbrown 0.14.0", "keyed-set", @@ -4217,8 +4217,8 @@ dependencies = [ [[package]] name = "zenoh-link" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "async-std", "async-trait", @@ -4236,8 +4236,8 @@ dependencies = [ [[package]] name = "zenoh-link-commons" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "async-std", "async-trait", @@ -4252,8 +4252,8 @@ dependencies = [ [[package]] name = "zenoh-link-quic" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "async-rustls", "async-std", @@ -4276,8 +4276,8 @@ dependencies = [ [[package]] name = "zenoh-link-tcp" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "async-std", "async-trait", @@ -4292,8 +4292,8 @@ dependencies = [ [[package]] name = "zenoh-link-tls" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "async-rustls", "async-std", @@ -4315,8 +4315,8 @@ dependencies = [ [[package]] name = "zenoh-link-udp" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "async-std", "async-trait", @@ -4334,8 +4334,8 @@ dependencies = [ [[package]] name = "zenoh-link-unixsock_stream" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "async-std", "async-trait", @@ -4352,8 +4352,8 @@ dependencies = [ [[package]] name = "zenoh-link-ws" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "async-std", "async-trait", @@ -4372,8 +4372,8 @@ dependencies = [ [[package]] name = "zenoh-macros" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "proc-macro2", "quote", @@ -4385,8 +4385,8 @@ dependencies = [ [[package]] name = "zenoh-plugin-rest" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "anyhow", "async-std", @@ -4442,8 +4442,8 @@ dependencies = [ [[package]] name = "zenoh-plugin-trait" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "libloading 0.8.0", "log", @@ -4455,8 +4455,8 @@ dependencies = [ [[package]] name = "zenoh-protocol" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "const_format", "hex", @@ -4471,16 +4471,16 @@ dependencies = [ [[package]] name = "zenoh-result" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "anyhow", ] [[package]] name = "zenoh-sync" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "async-std", "event-listener", @@ -4494,8 +4494,8 @@ dependencies = [ [[package]] name = "zenoh-transport" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "async-executor", "async-global-executor", @@ -4525,8 +4525,8 @@ dependencies = [ [[package]] name = "zenoh-util" -version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#c2bc9bd9b89b94539c6bbe12f3e1750deeda37c5" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=matching_state#f23bce57d6c7cc1fa0e6d5b5715cca72158e8812" dependencies = [ "async-std", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index de89b2e..a0bcc57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,13 +48,13 @@ regex = "1.7.1" rustc_version = "0.4" serde = "1.0.154" serde_json = "1.0.94" -zenoh = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "master", features = ["unstable"] } -zenoh-collections = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "master" } -zenoh-core = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "master" } -zenoh-ext = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "master", features = ["unstable"] } -zenoh-plugin-rest = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "master", default-features = false } -zenoh-plugin-trait = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "master", default-features = false } -zenoh-util = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "master", default-features = false } +zenoh = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "matching_state", features = ["unstable"] } +zenoh-collections = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "matching_state" } +zenoh-core = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "matching_state" } +zenoh-ext = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "matching_state", features = ["unstable"] } +zenoh-plugin-rest = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "matching_state", default-features = false } +zenoh-plugin-trait = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "matching_state", default-features = false } +zenoh-util = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "matching_state", default-features = false } [profile.release] debug = false diff --git a/zenoh-plugin-ros2dds/src/lib.rs b/zenoh-plugin-ros2dds/src/lib.rs index ae8f053..4b26164 100644 --- a/zenoh-plugin-ros2dds/src/lib.rs +++ b/zenoh-plugin-ros2dds/src/lib.rs @@ -267,7 +267,7 @@ pub async fn run(runtime: Runtime, config: Config) { let mut ros2_plugin = ROS2PluginRuntime { config: Arc::new(config), - zsession: &zsession, + zsession, participant, _member: member, plugin_id, @@ -281,7 +281,7 @@ pub struct ROS2PluginRuntime<'a> { config: Arc, // Note: &'a Arc here to keep the ownership of Session outside this struct // and be able to store the publishers/subscribers it creates in this same struct. - zsession: &'a Arc, + zsession: Arc, participant: dds_entity_t, _member: LivelinessToken<'a>, plugin_id: OwnedKeyExpr, @@ -323,7 +323,7 @@ impl<'a> ROS2PluginRuntime<'a> { .liveliness() .declare_subscriber(ke_liveliness_all) .querying() - .with(zenoh::handlers::DefaultHandler {}) + .with(flume::unbounded()) .res_async() .await .expect("Failed to create Liveliness Subscriber"); @@ -367,7 +367,7 @@ impl<'a> ROS2PluginRuntime<'a> { let mut routes_mgr = RoutesMgr::new( self.plugin_id.clone(), self.config.clone(), - self.zsession, + self.zsession.clone(), self.participant, discovery_mgr.discovered_entities.clone(), ros_discovery_mgr, diff --git a/zenoh-plugin-ros2dds/src/ros_discovery.rs b/zenoh-plugin-ros2dds/src/ros_discovery.rs index aab0214..500122b 100644 --- a/zenoh-plugin-ros2dds/src/ros_discovery.rs +++ b/zenoh-plugin-ros2dds/src/ros_discovery.rs @@ -189,7 +189,11 @@ impl RosDiscoveryInfoMgr { _ = ros_disco_timer_rcv.recv_async() => { let (ref msg, ref mut has_changed) = *zwrite!(participant_entities_state); if *has_changed { - log::debug!("Publish update on 'ros_discovery_info': {msg:?}"); + log::debug!("Publish update on 'ros_discovery_info' with {} writers and {} readers", + msg.node_entities_info_seq.values().next().map_or(0, |n| n.writer_gid_seq.len()), + msg.node_entities_info_seq.values().next().map_or(0, |n| n.reader_gid_seq.len()) + ); + log::trace!("Publish update on 'ros_discovery_info': {msg:?}"); Self::write(writer, msg).unwrap_or_else(|e| log::error!("Failed to publish update on 'ros_discovery_info' topic: {e}") ); diff --git a/zenoh-plugin-ros2dds/src/route_action_cli.rs b/zenoh-plugin-ros2dds/src/route_action_cli.rs index d7a86e8..27c7def 100644 --- a/zenoh-plugin-ros2dds/src/route_action_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_action_cli.rs @@ -36,7 +36,7 @@ pub struct RouteActionCli<'a> { zenoh_key_expr_prefix: OwnedKeyExpr, // the context #[serde(skip)] - context: Context<'a>, + context: Context, is_active: bool, #[serde(skip)] route_send_goal: RouteServiceCli<'a>, @@ -73,14 +73,14 @@ impl RouteActionCli<'_> { ros2_name: String, ros2_type: String, zenoh_key_expr_prefix: OwnedKeyExpr, - context: &Context<'a>, + context: Context, ) -> Result, String> { let route_send_goal = RouteServiceCli::create( format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_SEND_GOAL), format!("{ros2_type}_SendGoal"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_SEND_GOAL, &None, - context, + context.clone(), ) .await?; @@ -89,7 +89,7 @@ impl RouteActionCli<'_> { ROS2_ACTION_CANCEL_GOAL_SRV_TYPE.to_string(), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_CANCEL_GOAL, &None, - context, + context.clone(), ) .await?; @@ -98,7 +98,7 @@ impl RouteActionCli<'_> { format!("{ros2_type}_GetResult"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_GET_RESULT, &None, - context, + context.clone(), ) .await?; @@ -108,7 +108,7 @@ impl RouteActionCli<'_> { &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_FEEDBACK, true, QOS_DEFAULT_ACTION_FEEDBACK.clone(), - context, + context.clone(), ) .await?; @@ -118,7 +118,7 @@ impl RouteActionCli<'_> { &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_STATUS, true, QOS_DEFAULT_ACTION_STATUS.clone(), - context, + context.clone(), ) .await?; @@ -126,7 +126,7 @@ impl RouteActionCli<'_> { ros2_name, ros2_type, zenoh_key_expr_prefix, - context: context.clone(), + context, is_active: false, route_send_goal, route_cancel_goal, @@ -139,7 +139,7 @@ impl RouteActionCli<'_> { }) } - async fn activate<'a>(&'a mut self) -> Result<(), String> { + async fn activate(&mut self) -> Result<(), String> { self.is_active = true; // create associated LivelinessToken diff --git a/zenoh-plugin-ros2dds/src/route_action_srv.rs b/zenoh-plugin-ros2dds/src/route_action_srv.rs index ce7a7ae..a0caa25 100644 --- a/zenoh-plugin-ros2dds/src/route_action_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_action_srv.rs @@ -35,7 +35,7 @@ pub struct RouteActionSrv<'a> { zenoh_key_expr_prefix: OwnedKeyExpr, // the context #[serde(skip)] - context: Context<'a>, + context: Context, is_active: bool, #[serde(skip)] route_send_goal: RouteServiceSrv<'a>, @@ -72,14 +72,14 @@ impl RouteActionSrv<'_> { ros2_name: String, ros2_type: String, zenoh_key_expr_prefix: OwnedKeyExpr, - context: &Context<'a>, + context: Context, ) -> Result, String> { let route_send_goal = RouteServiceSrv::create( format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_SEND_GOAL), format!("{ros2_type}_SendGoal"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_SEND_GOAL, &None, - context, + context.clone(), ) .await?; @@ -88,7 +88,7 @@ impl RouteActionSrv<'_> { ROS2_ACTION_CANCEL_GOAL_SRV_TYPE.to_string(), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_CANCEL_GOAL, &None, - context, + context.clone(), ) .await?; @@ -97,7 +97,7 @@ impl RouteActionSrv<'_> { format!("{ros2_type}_GetResult"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_GET_RESULT, &None, - context, + context.clone(), ) .await?; @@ -108,7 +108,7 @@ impl RouteActionSrv<'_> { &None, true, QOS_DEFAULT_ACTION_FEEDBACK.clone(), - context, + context.clone(), ) .await?; @@ -119,7 +119,7 @@ impl RouteActionSrv<'_> { &None, true, QOS_DEFAULT_ACTION_STATUS.clone(), - context, + context.clone(), ) .await?; @@ -127,7 +127,7 @@ impl RouteActionSrv<'_> { ros2_name, ros2_type, zenoh_key_expr_prefix, - context: context.clone(), + context, is_active: false, route_send_goal, route_cancel_goal, @@ -140,7 +140,7 @@ impl RouteActionSrv<'_> { }) } - async fn activate<'a>(&'a mut self) -> Result<(), String> { + async fn activate(&mut self) -> Result<(), String> { self.is_active = true; // create associated LivelinessToken diff --git a/zenoh-plugin-ros2dds/src/route_publisher.rs b/zenoh-plugin-ros2dds/src/route_publisher.rs index b3624d7..3d52e6d 100644 --- a/zenoh-plugin-ros2dds/src/route_publisher.rs +++ b/zenoh-plugin-ros2dds/src/route_publisher.rs @@ -25,7 +25,7 @@ use zenoh::prelude::r#async::AsyncResolve; use zenoh::prelude::*; use zenoh::publication::Publisher; use zenoh_core::SyncResolve; -use zenoh_ext::{PublicationCache, SessionExt}; +use zenoh_ext::{ArcSessionExt, PublicationCache}; use crate::dds_types::{DDSRawSample, TypeInfo}; use crate::dds_utils::{ @@ -34,24 +34,33 @@ use crate::dds_utils::{ }; use crate::liveliness_mgt::new_ke_liveliness_pub; use crate::ros2_utils::{is_message_for_action, ros2_message_type_to_dds_type}; +use crate::ros_discovery::RosDiscoveryInfoMgr; use crate::routes_mgr::Context; use crate::{qos_helpers::*, Config}; use crate::{KE_PREFIX_PUB_CACHE, LOG_PAYLOAD}; -pub struct ZPublisher<'a> { - publisher: Publisher<'static>, - _cache: Option>, +pub struct ZPublisher { + publisher: Arc>, + _matching_listener: zenoh::publication::MatchingListener<'static, ()>, + _cache: Option>, cache_size: usize, } -impl<'a> Deref for ZPublisher<'a> { - type Target = Publisher<'static>; +impl Deref for ZPublisher { + type Target = Arc>; fn deref(&self) -> &Self::Target { &self.publisher } } +// struct InnerState<'a> { +// TODO? solution to create empty route, and then add an InnerState after containing +// ZPublisher + dds_reader Atomic + Matching Listener that create DDS Reader +// +// not sure it solves since anyway MatchingListener holds a &Publisher +// } + // a route from DDS to Zenoh #[allow(clippy::upper_case_acronyms)] #[derive(Serialize)] @@ -64,17 +73,17 @@ pub struct RoutePublisher<'a> { zenoh_key_expr: OwnedKeyExpr, // the context #[serde(skip)] - context: Context<'a>, + context: Context, // the zenoh publisher used to re-publish to zenoh the data received by the DDS Reader // `None` when route is created on a remote announcement and no local ROS2 Subscriber discovered yet #[serde( rename = "publication_cache_size", serialize_with = "serialize_pub_cache" )] - zenoh_publisher: ZPublisher<'a>, + zenoh_publisher: ZPublisher, // the local DDS Reader created to serve the route (i.e. re-publish to zenoh data coming from DDS) #[serde(serialize_with = "serialize_atomic_entity_guid")] - dds_reader: AtomicDDSEntity, + dds_reader: Arc, // TypeInfo for Reader creation (if available) #[serde(skip)] type_info: Option>, @@ -113,15 +122,15 @@ impl fmt::Display for RoutePublisher<'_> { impl RoutePublisher<'_> { #[allow(clippy::too_many_arguments)] - pub async fn create<'a>( + pub async fn create( ros2_name: String, ros2_type: String, zenoh_key_expr: OwnedKeyExpr, type_info: &Option>, keyless: bool, reader_qos: Qos, - context: &Context<'a>, - ) -> Result, String> { + context: Context, + ) -> Result, String> { log::debug!( "Route Publisher ({ros2_name} -> {zenoh_key_expr}): creation with type {ros2_type}" ); @@ -129,7 +138,7 @@ impl RoutePublisher<'_> { // create the zenoh Publisher // if Reader shall be TRANSIENT_LOCAL, use a PublicationCache to store historical data let transient_local = is_transient_local(&reader_qos); - let (cache, cache_size) = if transient_local { + let (cache, cache_size): (Option, usize) = if transient_local { #[allow(non_upper_case_globals)] let history_qos = get_history_or_default(&reader_qos); let durability_service_qos = get_durability_service_or_default(&reader_qos); @@ -187,25 +196,78 @@ impl RoutePublisher<'_> { _ => CongestionControl::Drop, }; - let publisher: Publisher<'static> = context + let publisher: Arc> = context .zsession .declare_publisher(zenoh_key_expr.clone()) .congestion_control(congestion_ctrl) .res_async() .await - .map_err(|e| format!("Failed create Publisher for key {zenoh_key_expr}: {e}",))?; + .map_err(|e| format!("Failed create Publisher for key {zenoh_key_expr}: {e}",))? + .into_arc(); + + // activate/deactivate DDS Reader on detection/undetection of matching Subscribers + // (copy/move all required args for the callback) + let dds_reader: Arc = Arc::new(DDS_ENTITY_NULL.into()); + + let matching_listener = { + publisher + .matching_listener() + .callback({ + let dds_reader = dds_reader.clone(); + let ros2_name = ros2_name.clone(); + let ros2_type = ros2_type.clone(); + let zenoh_key_expr = zenoh_key_expr.clone(); + let route_id = + format!("Route Publisher (ROS:{ros2_name} -> Zenoh:{zenoh_key_expr})"); + let context = context.clone(); + let reader_qos = reader_qos.clone(); + let type_info = type_info.clone(); + let publisher = publisher.clone(); + + move |status| { + if status.is_matching() { + if let Err(e) = activate_dds_reader( + &dds_reader, + &ros2_name, + &ros2_type, + &zenoh_key_expr, + &route_id, + &context, + keyless, + &reader_qos, + &type_info, + &publisher, + ) { + log::error!("{route_id}: failed to activate DDS Reader: {e}"); + } + } else { + deactivate_dds_reader( + &dds_reader, + &route_id, + &context.ros_discovery_mgr, + ) + } + } + }) + .res_async() + .await + .map_err(|e| format!("Failed to lisetn of matchibng status changes: {e}",))? + }; + + // Ok(route) Ok(RoutePublisher { ros2_name, ros2_type, zenoh_key_expr, - context: context.clone(), + context, zenoh_publisher: ZPublisher { publisher, + _matching_listener: matching_listener, _cache: cache, cache_size, }, - dds_reader: DDS_ENTITY_NULL.into(), + dds_reader, type_info: type_info.clone(), reader_qos, keyless, @@ -220,7 +282,7 @@ impl RoutePublisher<'_> { let type_name = ros2_message_type_to_dds_type(&self.ros2_type); let read_period = get_read_period(&self.context.config, &self.zenoh_key_expr); let route_id = self.to_string(); - let publisher = self.zenoh_publisher.deref().clone(); + let publisher = self.zenoh_publisher.clone(); // create matching DDS Reader that forwards data coming from DDS to Zenoh let dds_reader = create_dds_reader( @@ -233,12 +295,19 @@ impl RoutePublisher<'_> { read_period, move |sample: &DDSRawSample| { do_route_message( - sample, &publisher, // &ke, + sample, &publisher, + // &self.zenoh_key_expr, + // &self.context.zsession, &route_id, ); }, )?; - self.dds_reader.swap(dds_reader, Ordering::Relaxed); + let old = self.dds_reader.swap(dds_reader, Ordering::Relaxed); + if old != DDS_ENTITY_NULL { + if let Err(e) = delete_dds_entity(old) { + log::warn!("{self}: failed to delete overwritten DDS Reader: {e}"); + } + } // add reader's GID in ros_discovery_info message self.context @@ -372,7 +441,72 @@ fn get_read_period(config: &Config, ke: &keyexpr) -> Option { None } -fn do_route_message(sample: &DDSRawSample, publisher: &Publisher, route_id: &str) { +#[allow(clippy::too_many_arguments)] +fn activate_dds_reader( + dds_reader: &Arc, + ros2_name: &str, + ros2_type: &str, + zenoh_key_expr: &OwnedKeyExpr, + route_id: &str, + context: &Context, + keyless: bool, + reader_qos: &Qos, + type_info: &Option>, + publisher: &Arc>, +) -> Result<(), String> { + let topic_name: String = format!("rt{}", ros2_name); + let type_name = ros2_message_type_to_dds_type(ros2_type); + let read_period = get_read_period(&context.config, zenoh_key_expr); + + // create matching DDS Reader that forwards data coming from DDS to Zenoh + let reader = create_dds_reader( + context.participant, + topic_name, + type_name, + type_info, + keyless, + reader_qos.clone(), + read_period, + { + let route_id = route_id.to_string(); + let publisher = publisher.clone(); + move |sample: &DDSRawSample| { + do_route_message(sample, &publisher, &route_id); + } + }, + )?; + let old = dds_reader.deref().swap(reader, Ordering::Relaxed); + // add reader's GID in ros_discovery_info message + context.ros_discovery_mgr.add_dds_reader(get_guid(&reader)?); + + if old != DDS_ENTITY_NULL { + if let Err(e) = delete_dds_entity(old) { + log::warn!("{route_id}: failed to delete overwritten DDS Reader: {e}"); + } + } + + Ok(()) +} + +fn deactivate_dds_reader( + dds_reader: &Arc, + route_id: &str, + ros_discovery_mgr: &Arc, +) { + let reader = dds_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed); + if reader != DDS_ENTITY_NULL { + // remove reader's GID from ros_discovery_info message + match get_guid(&reader) { + Ok(gid) => ros_discovery_mgr.remove_dds_reader(gid), + Err(e) => log::warn!("{route_id}: {e}"), + } + if let Err(e) = delete_dds_entity(reader) { + log::warn!("{route_id}: error deleting DDS Reader: {e}"); + } + } +} + +fn do_route_message(sample: &DDSRawSample, publisher: &Arc, route_id: &str) { if *LOG_PAYLOAD { log::trace!("{route_id}: routing message - payload: {:02x?}", sample); } else { diff --git a/zenoh-plugin-ros2dds/src/route_publisher.rs.BAK b/zenoh-plugin-ros2dds/src/route_publisher.rs.BAK deleted file mode 100644 index 460b857..0000000 --- a/zenoh-plugin-ros2dds/src/route_publisher.rs.BAK +++ /dev/null @@ -1,536 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -use cyclors::qos::{HistoryKind, Qos}; -use cyclors::DDS_LENGTH_UNLIMITED; -use serde::{Serialize, Serializer}; -use std::ops::Deref; -use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::time::Duration; -use std::{collections::HashSet, fmt}; -use zenoh::liveliness::LivelinessToken; -use zenoh::prelude::r#async::AsyncResolve; -use zenoh::prelude::*; -use zenoh::publication::{Publisher, MatchingStatus}; -use zenoh_core::SyncResolve; -use zenoh_ext::{PublicationCache, SessionExt}; - -use crate::dds_types::{DDSRawSample, TypeInfo}; -use crate::dds_utils::{ - create_dds_reader, delete_dds_entity, get_guid, serialize_atomic_entity_guid, AtomicDDSEntity, - DDS_ENTITY_NULL, -}; -use crate::liveliness_mgt::new_ke_liveliness_pub; -use crate::ros2_utils::{is_message_for_action, ros2_message_type_to_dds_type}; -use crate::routes_mgr::Context; -use crate::{qos_helpers::*, Config}; -use crate::{KE_PREFIX_PUB_CACHE, LOG_PAYLOAD}; - -pub struct ZPublisher<'a> { - publisher: Publisher<'static>, - _cache: Option>, - cache_size: usize, -} - -impl<'a> Deref for ZPublisher<'a> { - type Target = Publisher<'static>; - - fn deref(&self) -> &Self::Target { - &self.publisher - } -} - -// a route from DDS to Zenoh -#[allow(clippy::upper_case_acronyms)] -#[derive(Serialize)] -pub struct RoutePublisher<'a> { - // the ROS2 Publisher name - ros2_name: String, - // the ROS2 type - ros2_type: String, - // the Zenoh key expression used for routing - zenoh_key_expr: OwnedKeyExpr, - // the context - #[serde(skip)] - context: Arc, - // the zenoh publisher used to re-publish to zenoh the data received by the DDS Reader - // `None` when route is created on a remote announcement and no local ROS2 Subscriber discovered yet - #[serde( - rename = "publication_cache_size", - serialize_with = "serialize_pub_cache" - )] - zenoh_publisher: ZPublisher<'a>, - // the local DDS Reader created to serve the route (i.e. re-publish to zenoh data coming from DDS) - #[serde(serialize_with = "serialize_atomic_entity_guid")] - dds_reader: Arc, - // the listener of matching subscriptions - when triggeres, create or remove dds_reader - #[serde(skip)] - matching_listener: zenoh::publication::MatchingListener<'a, ()> , - // TypeInfo for Reader creation (if available) - #[serde(skip)] - type_info: Option>, - // if the topic is keyless - #[serde(skip)] - keyless: bool, - // the QoS for the DDS Reader to be created. - // those are either the QoS announced by a remote bridge on a Reader discovery, - // either the QoS adapted from a local disovered Writer - #[serde(skip)] - reader_qos: Qos, - // a liveliness token associated to this route, for announcement to other plugins - #[serde(skip)] - liveliness_token: Option>, - // the list of remote routes served by this route (":"") - remote_routes: HashSet, - // the list of nodes served by this route - local_nodes: HashSet, -} - -impl Drop for RoutePublisher<'_> { - fn drop(&mut self) { - self.deactivate_dds_reader(); - } -} - -impl fmt::Display for RoutePublisher<'_> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "Route Publisher (ROS:{} -> Zenoh:{})", - self.ros2_name, self.zenoh_key_expr - ) - } -} - -impl RoutePublisher<'_> { - #[allow(clippy::too_many_arguments)] - pub async fn create<'a>( - ros2_name: String, - ros2_type: String, - zenoh_key_expr: OwnedKeyExpr, - type_info: &Option>, - keyless: bool, - reader_qos: Qos, - context: Arc, - ) -> Result, String> { - log::debug!( - "Route Publisher ({ros2_name} -> {zenoh_key_expr}): creation with type {ros2_type}" - ); - - let contextA = context.clone(); - - // create the zenoh Publisher - // if Reader shall be TRANSIENT_LOCAL, use a PublicationCache to store historical data - let transient_local = is_transient_local(&reader_qos); - let (cache, cache_size) = if transient_local { - #[allow(non_upper_case_globals)] - let history_qos = get_history_or_default(&reader_qos); - let durability_service_qos = get_durability_service_or_default(&reader_qos); - let mut history = match (history_qos.kind, history_qos.depth) { - (HistoryKind::KEEP_LAST, n) => { - if keyless { - // only 1 instance => history=n - n as usize - } else if durability_service_qos.max_instances == DDS_LENGTH_UNLIMITED { - // No limit! => history=MAX - usize::MAX - } else if durability_service_qos.max_instances > 0 { - // Compute cache size as history.depth * durability_service.max_instances - // This makes the assumption that the frequency of publication is the same for all instances... - // But as we have no way to have 1 cache per-instance, there is no other choice. - n.saturating_mul(durability_service_qos.max_instances) as usize - } else { - n as usize - } - } - (HistoryKind::KEEP_ALL, _) => usize::MAX, - }; - // In case there are several Writers served by this route, increase the cache size - history = history.saturating_mul(context.config.transient_local_cache_multiplier); - log::debug!( - "Route Publisher ({ros2_name} -> {zenoh_key_expr}): caching TRANSIENT_LOCAL publications via a PublicationCache with history={history} (computed from Reader's QoS: history=({:?},{}), durability_service.max_instances={})", - history_qos.kind, history_qos.depth, durability_service_qos.max_instances - ); - ( - Some( - context - .zsession - .declare_publication_cache(&zenoh_key_expr) - .history(history) - .queryable_prefix(*KE_PREFIX_PUB_CACHE / &context.plugin_id) - .queryable_allowed_origin(Locality::Remote) // Note: don't reply to queries from local QueryingSubscribers - .res_async() - .await - .map_err(|e| { - format!("Failed create PublicationCache for key {zenoh_key_expr}: {e}",) - })?, - ), - history, - ) - } else { - (None, 0) - }; - - // CongestionControl to be used when re-publishing over zenoh: Blocking if Writer is RELIABLE (since we don't know what is remote Reader's QoS) - let congestion_ctrl = match ( - context.config.reliable_routes_blocking, - is_reliable(&reader_qos), - ) { - (true, true) => CongestionControl::Block, - _ => CongestionControl::Drop, - }; - - let publisher: Publisher<'static> = context - .zsession - .declare_publisher(zenoh_key_expr.clone()) - .congestion_control(congestion_ctrl) - .res_async() - .await - .map_err(|e| format!("Failed create Publisher for key {zenoh_key_expr}: {e}",))?; - - // let route = RoutePublisher { - // ros2_name, - // ros2_type, - // zenoh_key_expr, - // context: context.clone(), - // zenoh_publisher: ZPublisher { - // publisher, - // _cache: cache, - // cache_size, - // }, - // dds_reader: Arc::new(DDS_ENTITY_NULL.into()), - // type_info: type_info.clone(), - // reader_qos, - // keyless, - // liveliness_token: None, - // remote_routes: HashSet::new(), - // local_nodes: HashSet::new(), - // }; - - // activate/deactivate DDS Reader on detection/undetection of matching Subscribers - // (copy/move all required args for the callback) - let dds_reader: Arc = Arc::new(DDS_ENTITY_NULL.into()); - - // let dds_reader = route.dds_reader.clone(); - // let ros2_name = route.ros2_name.clone(); - // let ros2_type = route.ros2_type.clone(); - // let zenoh_key_expr = route.zenoh_key_expr.clone(); - // let route_id = route.to_string(); - // let context = route.context.clone(); - // let reader_qos = route.reader_qos.clone(); - // let type_info = route.type_info.clone(); - // let publisher = route.zenoh_publisher.publisher.clone(); - // let publisher2 = route.zenoh_publisher.publisher.clone(); - - let matching_listener: zenoh::publication::MatchingListener<'_, ()> = { - let dds_reader = dds_reader.clone(); - let ros2_name = ros2_name.clone(); - let ros2_type = ros2_type.clone(); - let zenoh_key_expr = zenoh_key_expr.clone(); - let route_id = format!("Route Publisher (ROS:{ros2_name} -> Zenoh:{zenoh_key_expr})"); - let context2 = context.clone(); - let reader_qos = reader_qos.clone(); - let type_info = type_info.clone(); - let publisher2: Publisher<'static> = publisher.clone(); - let publisher3: Publisher<'static> = publisher2.clone(); - - publisher2.matching_listener().callback( - move |status| { - if status.is_matching() { - if let Err(e) = activate_dds_reader( - &dds_reader, - &ros2_name, - &ros2_type, - &zenoh_key_expr, - &route_id, - &context2, - keyless, - &reader_qos, - &type_info, - publisher3.clone()) - { - log::error!("{route_id}: failed to activate DDS Reader: {e}"); - } - } else { - deactivate_dds_reader(&dds_reader, &route_id, &context) - } - } - ).res_async().await - .map_err(|e| format!("Failed to lisetn of matchibng status changes: {e}",))? - } ; - - // Ok(route) - - Ok(RoutePublisher { - ros2_name, - ros2_type, - zenoh_key_expr, - context: context, - zenoh_publisher: ZPublisher { - publisher, - _cache: cache, - cache_size, - }, - dds_reader, - matching_listener, - type_info: type_info.clone(), - reader_qos, - keyless, - liveliness_token: None, - remote_routes: HashSet::new(), - local_nodes: HashSet::new(), - }) - } - - fn activate_dds_reader(&mut self) -> Result<(), String> { - let topic_name = format!("rt{}", self.ros2_name); - let type_name = ros2_message_type_to_dds_type(&self.ros2_type); - let read_period = get_read_period(&self.context.config, &self.zenoh_key_expr); - let route_id = self.to_string(); - let publisher = self.zenoh_publisher.deref().clone(); - - // create matching DDS Reader that forwards data coming from DDS to Zenoh - let dds_reader = create_dds_reader( - self.context.participant, - topic_name, - type_name, - &self.type_info, - self.keyless, - self.reader_qos.clone(), - read_period, - move |sample: &DDSRawSample| { - do_route_message( - sample, &publisher, // &ke, - &route_id, - ); - }, - )?; - let old = self.dds_reader.swap(dds_reader, Ordering::Relaxed); - if old != DDS_ENTITY_NULL { - if let Err(e) = delete_dds_entity(old) { - log::warn!("{self}: failed to delete overwritten DDS Reader: {e}"); - } - } - - // add reader's GID in ros_discovery_info message - self.context - .ros_discovery_mgr - .add_dds_reader(get_guid(&dds_reader)?); - - Ok(()) - } - - fn deactivate_dds_reader(&mut self) { - let dds_reader = self.dds_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed); - if dds_reader != DDS_ENTITY_NULL { - // remove reader's GID from ros_discovery_info message - match get_guid(&dds_reader) { - Ok(gid) => self.context.ros_discovery_mgr.remove_dds_reader(gid), - Err(e) => log::warn!("{self}: {e}"), - } - if let Err(e) = delete_dds_entity(dds_reader) { - log::warn!("{}: error deleting DDS Reader: {}", self, e); - } - } - } - - async fn announce_route(&mut self, discovered_writer_qos: &Qos) -> Result<(), String> { - // only if not for an Action (since actions declare their own liveliness) - if !is_message_for_action(&self.ros2_name) { - // create associated LivelinessToken - let liveliness_ke = new_ke_liveliness_pub( - &self.context.plugin_id, - &self.zenoh_key_expr, - &self.ros2_type, - self.keyless, - discovered_writer_qos, - )?; - let ros2_name = self.ros2_name.clone(); - self.liveliness_token = Some(self.context.zsession - .liveliness() - .declare_token(liveliness_ke) - .res_async() - .await - .map_err(|e| { - format!( - "Failed create LivelinessToken associated to route for Publisher {ros2_name}: {e}" - ) - })? - ); - } - Ok(()) - } - - fn retire_route(&mut self) { - self.liveliness_token = None; - } - - #[inline] - pub fn add_remote_route(&mut self, plugin_id: &str, zenoh_key_expr: &keyexpr) { - self.remote_routes - .insert(format!("{plugin_id}:{zenoh_key_expr}")); - log::debug!("{self} now serving remote routes {:?}", self.remote_routes); - // if 1st remote route added, activate the DDS Reader - if self.remote_routes.len() == 1 { - if let Err(e) = self.activate_dds_reader() { - log::error!("{self} activation of DDS Reader failed: {e}"); - } - } - } - - #[inline] - pub fn remove_remote_route(&mut self, plugin_id: &str, zenoh_key_expr: &keyexpr) { - self.remote_routes - .remove(&format!("{plugin_id}:{zenoh_key_expr}")); - log::debug!("{self} now serving remote routes {:?}", self.remote_routes); - // if last remote route removed, deactivate the DDS Reader - if self.remote_routes.is_empty() { - self.deactivate_dds_reader(); - } - } - - #[inline] - pub fn is_serving_remote_route(&self) -> bool { - !self.remote_routes.is_empty() - } - - #[inline] - pub async fn add_local_node(&mut self, node: String, discovered_writer_qos: &Qos) { - self.local_nodes.insert(node); - log::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if 1st local node added, announce the route - if self.local_nodes.len() == 1 { - if let Err(e) = self.announce_route(discovered_writer_qos).await { - log::error!("{self} announcement failed: {e}"); - } - } - } - - #[inline] - pub fn remove_local_node(&mut self, node: &str) { - self.local_nodes.remove(node); - log::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if last local node removed, retire the route - if self.local_nodes.is_empty() { - self.retire_route(); - } - } - - #[inline] - pub fn is_serving_local_node(&self) -> bool { - !self.local_nodes.is_empty() - } - - #[inline] - pub fn is_unused(&self) -> bool { - !self.is_serving_local_node() && !self.is_serving_remote_route() - } -} - -pub fn serialize_pub_cache(zpub: &ZPublisher, s: S) -> Result -where - S: Serializer, -{ - s.serialize_u64(zpub.cache_size as u64) -} - -// Return the read period if keyexpr matches one of the "pub_max_frequencies" option -fn get_read_period(config: &Config, ke: &keyexpr) -> Option { - for (re, freq) in &config.pub_max_frequencies { - if re.is_match(ke) { - return Some(Duration::from_secs_f32(1f32 / freq)); - } - } - None -} - -fn do_route_message(sample: &DDSRawSample, publisher: &Publisher, route_id: &str) { - if *LOG_PAYLOAD { - log::trace!("{route_id}: routing message - payload: {:02x?}", sample); - } else { - log::trace!("{route_id}: routing message - {} bytes", sample.len()); - } - if let Err(e) = publisher.put(sample).res_sync() { - log::error!("{route_id}: failed to route message: {e}"); - } -} - - -/////// -/// //// -/// //// - -fn activate_dds_reader( - dds_reader: &Arc, - ros2_name: &str, - ros2_type: &str, - zenoh_key_expr: &OwnedKeyExpr, - route_id: &str, - context: &Context, - keyless: bool, - reader_qos: &Qos, - type_info: &Option>, - zenoh_publisher: Publisher<'static> -) -> Result<(), String> { - let topic_name = format!("rt{}", ros2_name); - let type_name = ros2_message_type_to_dds_type(ros2_type); - let read_period = get_read_period(&context.config, zenoh_key_expr); - let route_id = route_id.to_string(); - let publisher: Publisher<'static> = zenoh_publisher.clone(); - - // create matching DDS Reader that forwards data coming from DDS to Zenoh - let reader = create_dds_reader( - context.participant, - topic_name, - type_name, - type_info, - keyless, - reader_qos.clone(), - read_period, - move |sample: &DDSRawSample| { - do_route_message( - sample, &publisher, // &ke, - &route_id, - ); - }, - )?; - let old = dds_reader.deref().swap(reader, Ordering::Relaxed); - // add reader's GID in ros_discovery_info message - context - .ros_discovery_mgr - .add_dds_reader(get_guid(&reader)?); - - if old != DDS_ENTITY_NULL { - if let Err(e) = delete_dds_entity(old) { - log::warn!("{route_id}: failed to delete overwritten DDS Reader: {e}"); - } - } - - Ok(()) -} - -fn deactivate_dds_reader(dds_reader: &Arc, route_id: &str, context: &Context) { - let reader = dds_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed); - if reader != DDS_ENTITY_NULL { - // remove reader's GID from ros_discovery_info message - match get_guid(&reader) { - Ok(gid) => context.ros_discovery_mgr.remove_dds_reader(gid), - Err(e) => log::warn!("{route_id}: {e}"), - } - if let Err(e) = delete_dds_entity(reader) { - log::warn!("{route_id}: error deleting DDS Reader: {e}"); - } - } -} diff --git a/zenoh-plugin-ros2dds/src/route_publisher.rs.BAK2 b/zenoh-plugin-ros2dds/src/route_publisher.rs.BAK2 deleted file mode 100644 index fe70dba..0000000 --- a/zenoh-plugin-ros2dds/src/route_publisher.rs.BAK2 +++ /dev/null @@ -1,556 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -use cyclors::qos::{HistoryKind, Qos}; -use cyclors::DDS_LENGTH_UNLIMITED; -use serde::{Serialize, Serializer}; -use std::ops::Deref; -use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::time::Duration; -use std::{collections::HashSet, fmt}; -use zenoh::liveliness::LivelinessToken; -use zenoh::prelude::r#async::AsyncResolve; -use zenoh::prelude::*; -use zenoh::publication::{Publisher, MatchingStatus, matching_listener_for}; -use zenoh_core::SyncResolve; -use zenoh_ext::{PublicationCache, SessionExt}; - -use crate::dds_types::{DDSRawSample, TypeInfo}; -use crate::dds_utils::{ - create_dds_reader, delete_dds_entity, get_guid, serialize_atomic_entity_guid, AtomicDDSEntity, - DDS_ENTITY_NULL, -}; -use crate::liveliness_mgt::new_ke_liveliness_pub; -use crate::ros2_utils::{is_message_for_action, ros2_message_type_to_dds_type}; -use crate::routes_mgr::Context; -use crate::{qos_helpers::*, Config}; -use crate::{KE_PREFIX_PUB_CACHE, LOG_PAYLOAD}; - -pub struct ZPublisher<'a> { - publisher: Publisher<'static>, - _cache: Option>, - cache_size: usize, -} - -impl<'a> Deref for ZPublisher<'a> { - type Target = Publisher<'static>; - - fn deref(&self) -> &Self::Target { - &self.publisher - } -} - -// a route from DDS to Zenoh -#[allow(clippy::upper_case_acronyms)] -#[derive(Serialize)] -pub struct RoutePublisher<'a> { - // the ROS2 Publisher name - ros2_name: String, - // the ROS2 type - ros2_type: String, - // the Zenoh key expression used for routing - zenoh_key_expr: OwnedKeyExpr, - // the context - #[serde(skip)] - context: Arc, - // the zenoh publisher used to re-publish to zenoh the data received by the DDS Reader - // `None` when route is created on a remote announcement and no local ROS2 Subscriber discovered yet - #[serde( - rename = "publication_cache_size", - serialize_with = "serialize_pub_cache" - )] - zenoh_publisher: ZPublisher<'a>, - // the local DDS Reader created to serve the route (i.e. re-publish to zenoh data coming from DDS) - #[serde(serialize_with = "serialize_atomic_entity_guid")] - dds_reader: Arc, - // the listener of matching subscriptions - when triggeres, create or remove dds_reader - #[serde(skip)] - matching_listener: zenoh::publication::MatchingListener<'a, ()> , - // TypeInfo for Reader creation (if available) - #[serde(skip)] - type_info: Option>, - // if the topic is keyless - #[serde(skip)] - keyless: bool, - // the QoS for the DDS Reader to be created. - // those are either the QoS announced by a remote bridge on a Reader discovery, - // either the QoS adapted from a local disovered Writer - #[serde(skip)] - reader_qos: Qos, - // a liveliness token associated to this route, for announcement to other plugins - #[serde(skip)] - liveliness_token: Option>, - // the list of remote routes served by this route (":"") - remote_routes: HashSet, - // the list of nodes served by this route - local_nodes: HashSet, -} - -impl Drop for RoutePublisher<'_> { - fn drop(&mut self) { - self.deactivate_dds_reader(); - } -} - -impl fmt::Display for RoutePublisher<'_> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "Route Publisher (ROS:{} -> Zenoh:{})", - self.ros2_name, self.zenoh_key_expr - ) - } -} - -impl RoutePublisher<'_> { - #[allow(clippy::too_many_arguments)] - pub async fn create<'a>( - ros2_name: String, - ros2_type: String, - zenoh_key_expr: OwnedKeyExpr, - type_info: &Option>, - keyless: bool, - reader_qos: Qos, - context: Arc, - ) -> Result, String> { - log::debug!( - "Route Publisher ({ros2_name} -> {zenoh_key_expr}): creation with type {ros2_type}" - ); - - // create the zenoh Publisher - // if Reader shall be TRANSIENT_LOCAL, use a PublicationCache to store historical data - let transient_local = is_transient_local(&reader_qos); - let (cache, cache_size) = if transient_local { - #[allow(non_upper_case_globals)] - let history_qos = get_history_or_default(&reader_qos); - let durability_service_qos = get_durability_service_or_default(&reader_qos); - let mut history = match (history_qos.kind, history_qos.depth) { - (HistoryKind::KEEP_LAST, n) => { - if keyless { - // only 1 instance => history=n - n as usize - } else if durability_service_qos.max_instances == DDS_LENGTH_UNLIMITED { - // No limit! => history=MAX - usize::MAX - } else if durability_service_qos.max_instances > 0 { - // Compute cache size as history.depth * durability_service.max_instances - // This makes the assumption that the frequency of publication is the same for all instances... - // But as we have no way to have 1 cache per-instance, there is no other choice. - n.saturating_mul(durability_service_qos.max_instances) as usize - } else { - n as usize - } - } - (HistoryKind::KEEP_ALL, _) => usize::MAX, - }; - // In case there are several Writers served by this route, increase the cache size - history = history.saturating_mul(context.config.transient_local_cache_multiplier); - log::debug!( - "Route Publisher ({ros2_name} -> {zenoh_key_expr}): caching TRANSIENT_LOCAL publications via a PublicationCache with history={history} (computed from Reader's QoS: history=({:?},{}), durability_service.max_instances={})", - history_qos.kind, history_qos.depth, durability_service_qos.max_instances - ); - ( - Some( - context - .zsession - .declare_publication_cache(&zenoh_key_expr) - .history(history) - .queryable_prefix(*KE_PREFIX_PUB_CACHE / &context.plugin_id) - .queryable_allowed_origin(Locality::Remote) // Note: don't reply to queries from local QueryingSubscribers - .res_async() - .await - .map_err(|e| { - format!("Failed create PublicationCache for key {zenoh_key_expr}: {e}",) - })?, - ), - history, - ) - } else { - (None, 0) - }; - - // CongestionControl to be used when re-publishing over zenoh: Blocking if Writer is RELIABLE (since we don't know what is remote Reader's QoS) - let congestion_ctrl = match ( - context.config.reliable_routes_blocking, - is_reliable(&reader_qos), - ) { - (true, true) => CongestionControl::Block, - _ => CongestionControl::Drop, - }; - - let publisher: Publisher<'static> = context - .zsession - .declare_publisher(zenoh_key_expr.clone()) - .congestion_control(congestion_ctrl) - .res_async() - .await - .map_err(|e| format!("Failed create Publisher for key {zenoh_key_expr}: {e}",))?; - - // let route = RoutePublisher { - // ros2_name, - // ros2_type, - // zenoh_key_expr, - // context: context.clone(), - // zenoh_publisher: ZPublisher { - // publisher, - // _cache: cache, - // cache_size, - // }, - // dds_reader: Arc::new(DDS_ENTITY_NULL.into()), - // type_info: type_info.clone(), - // reader_qos, - // keyless, - // liveliness_token: None, - // remote_routes: HashSet::new(), - // local_nodes: HashSet::new(), - // }; - - // activate/deactivate DDS Reader on detection/undetection of matching Subscribers - // (copy/move all required args for the callback) - let dds_reader: Arc = Arc::new(DDS_ENTITY_NULL.into()); - - // let dds_reader = route.dds_reader.clone(); - // let ros2_name = route.ros2_name.clone(); - // let ros2_type = route.ros2_type.clone(); - // let zenoh_key_expr = route.zenoh_key_expr.clone(); - // let route_id = route.to_string(); - // let context = route.context.clone(); - // let reader_qos = route.reader_qos.clone(); - // let type_info = route.type_info.clone(); - // let publisher = route.zenoh_publisher.publisher.clone(); - // let publisher2 = route.zenoh_publisher.publisher.clone(); - - // let zpublisher = ZPublisher { - // publisher, - // _cache: cache, - // cache_size, - // }; - - let matching_listener: zenoh::publication::MatchingListener<'_, ()> = { - let dds_reader = dds_reader.clone(); - let ros2_name = ros2_name.clone(); - let ros2_type = ros2_type.clone(); - let zenoh_key_expr = zenoh_key_expr.clone(); - let route_id = format!("Route Publisher (ROS:{ros2_name} -> Zenoh:{zenoh_key_expr})"); - let context2 = context.clone(); - let reader_qos = reader_qos.clone(); - let type_info = type_info.clone(); - // let publisher2: Publisher<'static> = publisher.clone(); - // let publisher3: Publisher<'static> = publisher2.clone(); - - matching_listener_for(publisher.clone()).callback( - move |status| { - if status.is_matching() { - if let Err(e) = activate_dds_reader( - &dds_reader, - &ros2_name, - &ros2_type, - &zenoh_key_expr, - &route_id, - context2.clone(), - keyless, - &reader_qos, - &type_info, - // publisher3.clone() - ) - { - log::error!("{route_id}: failed to activate DDS Reader: {e}"); - } - } else { - deactivate_dds_reader(&dds_reader, &route_id, context2.clone()) - } - } - ).res_async().await - .map_err(|e| format!("Failed to lisetn of matchibng status changes: {e}",))? - } ; - - // Ok(route) - - Ok(RoutePublisher { - ros2_name, - ros2_type, - zenoh_key_expr, - context, - zenoh_publisher: ZPublisher { - publisher, - _cache: cache, - cache_size, - }, - dds_reader, - matching_listener, - type_info: type_info.clone(), - reader_qos, - keyless, - liveliness_token: None, - remote_routes: HashSet::new(), - local_nodes: HashSet::new(), - }) - } - - fn activate_dds_reader(&mut self) -> Result<(), String> { - let topic_name = format!("rt{}", self.ros2_name); - let type_name = ros2_message_type_to_dds_type(&self.ros2_type); - let read_period = get_read_period(&self.context.config, &self.zenoh_key_expr); - let route_id = self.to_string(); - let publisher = self.zenoh_publisher.deref().clone(); - - // create matching DDS Reader that forwards data coming from DDS to Zenoh - let dds_reader = create_dds_reader( - self.context.participant, - topic_name, - type_name, - &self.type_info, - self.keyless, - self.reader_qos.clone(), - read_period, - move |sample: &DDSRawSample| { - do_route_message( - sample, - // &publisher, - &self.zenoh_key_expr, - &self.context.zsession, - &route_id, - ); - }, - )?; - let old = self.dds_reader.swap(dds_reader, Ordering::Relaxed); - if old != DDS_ENTITY_NULL { - if let Err(e) = delete_dds_entity(old) { - log::warn!("{self}: failed to delete overwritten DDS Reader: {e}"); - } - } - - // add reader's GID in ros_discovery_info message - self.context - .ros_discovery_mgr - .add_dds_reader(get_guid(&dds_reader)?); - - Ok(()) - } - - fn deactivate_dds_reader(&mut self) { - let dds_reader = self.dds_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed); - if dds_reader != DDS_ENTITY_NULL { - // remove reader's GID from ros_discovery_info message - match get_guid(&dds_reader) { - Ok(gid) => self.context.ros_discovery_mgr.remove_dds_reader(gid), - Err(e) => log::warn!("{self}: {e}"), - } - if let Err(e) = delete_dds_entity(dds_reader) { - log::warn!("{}: error deleting DDS Reader: {}", self, e); - } - } - } - - async fn announce_route(&mut self, discovered_writer_qos: &Qos) -> Result<(), String> { - // only if not for an Action (since actions declare their own liveliness) - if !is_message_for_action(&self.ros2_name) { - // create associated LivelinessToken - let liveliness_ke = new_ke_liveliness_pub( - &self.context.plugin_id, - &self.zenoh_key_expr, - &self.ros2_type, - self.keyless, - discovered_writer_qos, - )?; - let ros2_name = self.ros2_name.clone(); - self.liveliness_token = Some(self.context.zsession - .liveliness() - .declare_token(liveliness_ke) - .res_async() - .await - .map_err(|e| { - format!( - "Failed create LivelinessToken associated to route for Publisher {ros2_name}: {e}" - ) - })? - ); - } - Ok(()) - } - - fn retire_route(&mut self) { - self.liveliness_token = None; - } - - #[inline] - pub fn add_remote_route(&mut self, plugin_id: &str, zenoh_key_expr: &keyexpr) { - self.remote_routes - .insert(format!("{plugin_id}:{zenoh_key_expr}")); - log::debug!("{self} now serving remote routes {:?}", self.remote_routes); - // if 1st remote route added, activate the DDS Reader - if self.remote_routes.len() == 1 { - if let Err(e) = self.activate_dds_reader() { - log::error!("{self} activation of DDS Reader failed: {e}"); - } - } - } - - #[inline] - pub fn remove_remote_route(&mut self, plugin_id: &str, zenoh_key_expr: &keyexpr) { - self.remote_routes - .remove(&format!("{plugin_id}:{zenoh_key_expr}")); - log::debug!("{self} now serving remote routes {:?}", self.remote_routes); - // if last remote route removed, deactivate the DDS Reader - if self.remote_routes.is_empty() { - self.deactivate_dds_reader(); - } - } - - #[inline] - pub fn is_serving_remote_route(&self) -> bool { - !self.remote_routes.is_empty() - } - - #[inline] - pub async fn add_local_node(&mut self, node: String, discovered_writer_qos: &Qos) { - self.local_nodes.insert(node); - log::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if 1st local node added, announce the route - if self.local_nodes.len() == 1 { - if let Err(e) = self.announce_route(discovered_writer_qos).await { - log::error!("{self} announcement failed: {e}"); - } - } - } - - #[inline] - pub fn remove_local_node(&mut self, node: &str) { - self.local_nodes.remove(node); - log::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if last local node removed, retire the route - if self.local_nodes.is_empty() { - self.retire_route(); - } - } - - #[inline] - pub fn is_serving_local_node(&self) -> bool { - !self.local_nodes.is_empty() - } - - #[inline] - pub fn is_unused(&self) -> bool { - !self.is_serving_local_node() && !self.is_serving_remote_route() - } -} - -pub fn serialize_pub_cache(zpub: &ZPublisher, s: S) -> Result -where - S: Serializer, -{ - s.serialize_u64(zpub.cache_size as u64) -} - -// Return the read period if keyexpr matches one of the "pub_max_frequencies" option -fn get_read_period(config: &Config, ke: &keyexpr) -> Option { - for (re, freq) in &config.pub_max_frequencies { - if re.is_match(ke) { - return Some(Duration::from_secs_f32(1f32 / freq)); - } - } - None -} - -fn do_route_message( - sample: &DDSRawSample, - // publisher: &Publisher, - zenoh_key_expr: &OwnedKeyExpr, - zsession: &Arc, - route_id: &str) -{ - if *LOG_PAYLOAD { - log::trace!("{route_id}: routing message - payload: {:02x?}", sample); - } else { - log::trace!("{route_id}: routing message - {} bytes", sample.len()); - } - // if let Err(e) = publisher.put(sample).res_sync() { - // log::error!("{route_id}: failed to route message: {e}"); - // } - if let Err(e) = zsession.put(zenoh_key_expr, sample).res_sync() { - log::error!("{route_id}: failed to route message: {e}"); - } -} - - -/////// -/// //// -/// //// - -fn activate_dds_reader( - dds_reader: &Arc, - ros2_name: &str, - ros2_type: &str, - zenoh_key_expr: &OwnedKeyExpr, - route_id: &str, - context: Arc, - keyless: bool, - reader_qos: &Qos, - type_info: &Option>, - // zenoh_publisher: Publisher<'static> -) -> Result<(), String> { - let topic_name = format!("rt{}", ros2_name); - let type_name = ros2_message_type_to_dds_type(ros2_type); - let read_period = get_read_period(&context.config, zenoh_key_expr); - let route_id = route_id.to_string(); - // let publisher: Publisher<'static> = zenoh_publisher.clone(); - - // create matching DDS Reader that forwards data coming from DDS to Zenoh - let reader = create_dds_reader( - context.participant, - topic_name, - type_name, - type_info, - keyless, - reader_qos.clone(), - read_period, - move |sample: &DDSRawSample| { - do_route_message( - sample, - // &publisher, - zenoh_key_expr, - &context.zsession, - &route_id, - ); - }, - )?; - let old = dds_reader.deref().swap(reader, Ordering::Relaxed); - // add reader's GID in ros_discovery_info message - context - .ros_discovery_mgr - .add_dds_reader(get_guid(&reader)?); - - if old != DDS_ENTITY_NULL { - if let Err(e) = delete_dds_entity(old) { - log::warn!("{route_id}: failed to delete overwritten DDS Reader: {e}"); - } - } - - Ok(()) -} - -fn deactivate_dds_reader(dds_reader: &Arc, route_id: &str, context: Arc) { - let reader = dds_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed); - if reader != DDS_ENTITY_NULL { - // remove reader's GID from ros_discovery_info message - match get_guid(&reader) { - Ok(gid) => context.ros_discovery_mgr.remove_dds_reader(gid), - Err(e) => log::warn!("{route_id}: {e}"), - } - if let Err(e) = delete_dds_entity(reader) { - log::warn!("{route_id}: error deleting DDS Reader: {e}"); - } - } -} diff --git a/zenoh-plugin-ros2dds/src/route_service_cli.rs b/zenoh-plugin-ros2dds/src/route_service_cli.rs index 953e818..7edee6c 100644 --- a/zenoh-plugin-ros2dds/src/route_service_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_service_cli.rs @@ -13,7 +13,6 @@ // use cyclors::dds_entity_t; -use cyclors::qos::{History, HistoryKind, Qos, Reliability, ReliabilityKind, DDS_INFINITE_TIME}; use serde::Serialize; use std::sync::Arc; use std::{collections::HashSet, fmt}; @@ -49,7 +48,7 @@ pub struct RouteServiceCli<'a> { zenoh_key_expr: OwnedKeyExpr, // the context #[serde(skip)] - context: Context<'a>, + context: Context, is_active: bool, // the local DDS Reader receiving client's requests and routing them to Zenoh #[serde(serialize_with = "serialize_entity_guid")] @@ -105,7 +104,7 @@ impl RouteServiceCli<'_> { ros2_type: String, zenoh_key_expr: OwnedKeyExpr, type_info: &Option>, - context: &Context<'a>, + context: Context, ) -> Result, String> { log::debug!( "Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): creation with type {ros2_type}" @@ -167,7 +166,7 @@ impl RouteServiceCli<'_> { ros2_name, ros2_type, zenoh_key_expr, - context: context.clone(), + context, is_active: false, rep_writer, req_reader, @@ -177,7 +176,7 @@ impl RouteServiceCli<'_> { }) } - async fn activate<'a>(&'a mut self) -> Result<(), String> { + async fn activate(&mut self) -> Result<(), String> { self.is_active = true; // if not for an Action (since actions declare their own liveliness) diff --git a/zenoh-plugin-ros2dds/src/route_service_srv.rs b/zenoh-plugin-ros2dds/src/route_service_srv.rs index fbddd9e..82ddf5f 100644 --- a/zenoh-plugin-ros2dds/src/route_service_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_service_srv.rs @@ -13,7 +13,6 @@ // use cyclors::dds_entity_t; -use cyclors::qos::{History, HistoryKind, Qos, Reliability, ReliabilityKind, DDS_INFINITE_TIME}; use serde::Serialize; use std::collections::HashMap; use std::sync::atomic::{AtomicU64, Ordering}; @@ -52,7 +51,7 @@ pub struct RouteServiceSrv<'a> { zenoh_key_expr: OwnedKeyExpr, // the context #[serde(skip)] - context: Context<'a>, + context: Context, // the zenoh queryable used to expose the service server in zenoh. // `None` when route is created on a remote announcement and no local ROS2 Service Server discovered yet #[serde(rename = "is_active", serialize_with = "serialize_option_as_bool")] @@ -120,11 +119,10 @@ impl RouteServiceSrv<'_> { ros2_type: String, zenoh_key_expr: OwnedKeyExpr, type_info: &Option>, - context: &Context<'a>, + context: Context, ) -> Result, String> { - log::debug!( - "Route Service Server (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): creation with type {ros2_type}" - ); + let route_id = format!("Route Service Server (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr})"); + log::debug!("{route_id}: creation with type {ros2_type}"); // Default Service QoS let mut qos = QOS_DEFAULT_SERVICE.clone(); @@ -135,7 +133,8 @@ impl RouteServiceSrv<'_> { let user_data = format!("clientid= {client_id_str};"); qos.user_data = Some(user_data.into_bytes()); log::debug!( - "Route Service Server (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): using id '{client_id_str}' => USER_DATA={:?}", qos.user_data.as_ref().unwrap() + "{route_id}: using id '{client_id_str}' => USER_DATA={:?}", + qos.user_data.as_ref().unwrap() ); // create DDS Writer to send requests coming from Zenoh to the Service @@ -164,8 +163,6 @@ impl RouteServiceSrv<'_> { // create DDS Reader to receive replies and route them to Zenoh let rep_topic_name = format!("rr{ros2_name}Reply"); let rep_type_name = ros2_service_type_to_reply_dds_type(&ros2_type); - let queries_in_progress2 = queries_in_progress.clone(); - let zenoh_key_expr2 = zenoh_key_expr.clone(); let rep_reader = create_dds_reader( context.participant, rep_topic_name, @@ -174,14 +171,18 @@ impl RouteServiceSrv<'_> { true, qos, None, - move |sample| { - do_route_reply( - sample, - zenoh_key_expr2.clone(), - &mut zwrite!(queries_in_progress2), - "", - client_guid, - ); + { + let queries_in_progress = queries_in_progress.clone(); + let zenoh_key_expr = zenoh_key_expr.clone(); + move |sample| { + do_route_reply( + sample, + zenoh_key_expr.clone(), + &mut zwrite!(queries_in_progress), + &route_id, + client_guid, + ); + } }, )?; // add reader's GID in ros_discovery_info message @@ -193,7 +194,7 @@ impl RouteServiceSrv<'_> { ros2_name, ros2_type, zenoh_key_expr, - context: context.clone(), + context, zenoh_queryable: None, req_writer, rep_reader, @@ -206,7 +207,7 @@ impl RouteServiceSrv<'_> { }) } - async fn activate<'a>(&'a mut self) -> Result<(), String> { + async fn activate(&mut self) -> Result<(), String> { // For lifetime issue, redeclare the zenoh key expression that can't be stored in Self let declared_ke = self .context @@ -426,17 +427,18 @@ fn do_route_reply( u64::from_le_bytes(dds_rep_buf[4..12].try_into().unwrap()) }; - if guid != client_guid { - log::warn!( - "{route_id}: received response for another client: {guid:0x?} (me: {client_guid:0x?}" - ); - return; - } let seq_num = if cdr_header[1] == 0 { u64::from_be_bytes(dds_rep_buf[12..20].try_into().unwrap()) } else { u64::from_le_bytes(dds_rep_buf[12..20].try_into().unwrap()) }; + + if guid != client_guid { + log::warn!( + "{route_id}: received response for another client: {guid:0x?} (me: {client_guid:0x?})" + ); + return; + } match queries_in_progress.remove(&seq_num) { Some(query) => { use zenoh_core::SyncResolve; diff --git a/zenoh-plugin-ros2dds/src/route_subscriber.rs b/zenoh-plugin-ros2dds/src/route_subscriber.rs index dd24a8d..25eaf79 100644 --- a/zenoh-plugin-ros2dds/src/route_subscriber.rs +++ b/zenoh-plugin-ros2dds/src/route_subscriber.rs @@ -53,7 +53,7 @@ pub struct RouteSubscriber<'a> { zenoh_key_expr: OwnedKeyExpr, // the context #[serde(skip)] - context: Context<'a>, + context: Context, // the zenoh subscriber receiving data to be re-published by the DDS Writer // `None` when route is created on a remote announcement and no local ROS2 Subscriber discovered yet #[serde(rename = "is_active", serialize_with = "serialize_option_as_bool")] @@ -107,7 +107,7 @@ impl RouteSubscriber<'_> { zenoh_key_expr: OwnedKeyExpr, keyless: bool, writer_qos: Qos, - context: &Context<'a>, + context: Context, ) -> Result, String> { let transient_local = is_transient_local(&writer_qos); log::debug!("Route Subscriber ({zenoh_key_expr} -> {ros2_name}): creation with type {ros2_type} (transient_local:{transient_local})"); @@ -131,7 +131,7 @@ impl RouteSubscriber<'_> { ros2_name, ros2_type, zenoh_key_expr, - context: context.clone(), + context, zenoh_subscriber: None, dds_writer, transient_local, @@ -233,7 +233,7 @@ impl RouteSubscriber<'_> { // query all PublicationCaches on "//" let query_selector: Selector = (*KE_PREFIX_PUB_CACHE / plugin_id / &self.zenoh_key_expr).into(); - log::error!("Route Subscriber (Zenoh:{} -> ROS:{}): query historical data from {plugin_id} for TRANSIENT_LOCAL Reader on {query_selector}", + log::debug!("Route Subscriber (Zenoh:{} -> ROS:{}): query historical data from {plugin_id} for TRANSIENT_LOCAL Reader on {query_selector}", self.zenoh_key_expr, self.ros2_name ); diff --git a/zenoh-plugin-ros2dds/src/routes_mgr.rs b/zenoh-plugin-ros2dds/src/routes_mgr.rs index 44a9871..7043eab 100644 --- a/zenoh-plugin-ros2dds/src/routes_mgr.rs +++ b/zenoh-plugin-ros2dds/src/routes_mgr.rs @@ -70,10 +70,10 @@ enum RouteRef { // A Context struct to be shared as an Arc amongst all the code #[derive(Clone)] -pub struct Context<'a> { +pub struct Context { pub(crate) plugin_id: Arc, pub(crate) config: Arc, - pub(crate) zsession: &'a Arc, + pub(crate) zsession: Arc, pub(crate) participant: dds_entity_t, // all discovered entities pub(crate) discovered_entities: Arc>, @@ -82,7 +82,7 @@ pub struct Context<'a> { } pub struct RoutesMgr<'a> { - context: Arc>, + context: Context, // maps of established routes - ecah map indexed by topic/service/action name routes_publishers: HashMap>, routes_subscribers: HashMap>, @@ -100,20 +100,20 @@ impl<'a> RoutesMgr<'a> { pub fn new( plugin_id: OwnedKeyExpr, config: Arc, - zsession: &'a Arc, + zsession: Arc, participant: dds_entity_t, discovered_entities: Arc>, ros_discovery_mgr: Arc, admin_prefix: OwnedKeyExpr, ) -> RoutesMgr<'a> { - let context = Arc::new(Context { + let context = Context { plugin_id: Arc::new(plugin_id), config, zsession, participant, discovered_entities, ros_discovery_mgr, - }); + }; RoutesMgr { context, @@ -537,7 +537,7 @@ impl<'a> RoutesMgr<'a> { &None, keyless, reader_qos, - &self.context, + self.context.clone(), ) .await?; log::info!("{route} created"); @@ -574,7 +574,7 @@ impl<'a> RoutesMgr<'a> { zenoh_key_expr.to_owned(), keyless, writer_qos, - &self.context, + self.context.clone(), ) .await?; log::info!("{route} created"); @@ -608,7 +608,7 @@ impl<'a> RoutesMgr<'a> { ros2_type, zenoh_key_expr.to_owned(), &None, - &self.context, + self.context.clone(), ) .await?; log::info!("{route} created"); @@ -642,7 +642,7 @@ impl<'a> RoutesMgr<'a> { ros2_type, zenoh_key_expr.to_owned(), &None, - &self.context, + self.context.clone(), ) .await?; log::info!("{route} created"); @@ -674,7 +674,7 @@ impl<'a> RoutesMgr<'a> { ros2_name.clone(), ros2_type, zenoh_key_expr.to_owned(), - &self.context, + self.context.clone(), ) .await?; log::info!("{route} created"); @@ -704,7 +704,7 @@ impl<'a> RoutesMgr<'a> { ros2_name.clone(), ros2_type, zenoh_key_expr.to_owned(), - &self.context, + self.context.clone(), ) .await?; log::info!("{route} created");