diff --git a/zenoh-plugin-dds/src/lib.rs b/zenoh-plugin-dds/src/lib.rs index 705786e8..57ef9db4 100644 --- a/zenoh-plugin-dds/src/lib.rs +++ b/zenoh-plugin-dds/src/lib.rs @@ -530,7 +530,6 @@ impl<'a> DdsPluginRuntime<'a> { } if let Some(route) = self.routes_to_dds.get(&ke) { - // TODO: check if there is no type or QoS conflict with existing route debug!( "Route from resource {} to DDS already exists -- ignoring", ke @@ -538,14 +537,38 @@ impl<'a> DdsPluginRuntime<'a> { // #102: in forwarding mode, it might happen that the route have been created but without DDS Writer // (just to declare the Zenoh Subscriber). Thus, try to set a DDS Writer to the route here. // If already set, nothing will happen. - if let Some(qos) = writer_qos { - if let Err(e) = route.set_dds_writer(self.dp, qos) { - error!( - "{}: failed to set a DDS Writer after creation: {}", - route, e + match (writer_qos.clone(), route.get_qos()) { + (Some(new_qos), Some(exiting_qos)) => { + info!("Route Zenoh->DDS ({} -> {}): Upgrading QoS {}->{}", + ke, topic_name, + exiting_qos.clone().reliability.unwrap().kind as isize, + new_qos.clone().reliability.unwrap().kind as isize ); - return RouteStatus::CreationFailure(e); - } + + // new reliability is higher than existing one. Destroy existing writer before creating new one + if new_qos.clone().reliability.unwrap().kind as isize > exiting_qos.reliability.unwrap().kind as isize { + route.delete_dds_writer(); + // route.set_qos(new_qos); + if let Err(e) = route.set_dds_writer(self.dp, new_qos) { + error!( + "{}: failed to set a DDS Writer after creation: {}", + route, e + ); + return RouteStatus::CreationFailure(e); + } + } + }, + (Some(new_qos), None) => { + // no writer existing yet. Only create new one + if let Err(e) = route.set_dds_writer(self.dp, new_qos) { + error!( + "{}: failed to set a DDS Writer after creation: {}", + route, e + ); + return RouteStatus::CreationFailure(e); + } + }, + _ => { } // no need to delete writer in any case if no QoS exists already } return RouteStatus::Routed(ke); } diff --git a/zenoh-plugin-dds/src/route_zenoh_dds.rs b/zenoh-plugin-dds/src/route_zenoh_dds.rs index 2f5641f5..97f4ea1b 100644 --- a/zenoh-plugin-dds/src/route_zenoh_dds.rs +++ b/zenoh-plugin-dds/src/route_zenoh_dds.rs @@ -13,7 +13,7 @@ // use cyclors::{ - dds_entity_t, dds_get_entity_sertype, dds_strretcode, dds_writecdr, ddsi_serdata_from_ser_iov, + dds_entity_t, dds_get_entity_sertype, dds_strretcode, dds_writecdr, ddsi_serdata_from_ser_iov, dds_get_qos, dds_create_qos, ddsi_serdata_kind_SDK_DATA, ddsi_sertype, ddsrt_iovec_t, }; use serde::{Serialize, Serializer}; @@ -352,6 +352,24 @@ impl RouteZenohDDS<'_> { pub(crate) fn has_local_routed_reader(&self) -> bool { !self.local_routed_readers.is_empty() } + + pub(crate) fn get_qos(&self) -> Option { + unsafe { + let qos = dds_create_qos(); + let ret = dds_get_qos(self.dds_writer.load(Ordering::Relaxed), qos); + if ret == 0 { + return Option::Some(Qos::from_qos_native(qos)); + } else { + log::warn!( + "Retrieving QoS failed: {}", + CStr::from_ptr(dds_strretcode(ret)) + .to_str() + .unwrap_or("unrecoverable DDS retcode") + ); + return Option::None; + } + } + } } fn do_route_data(s: Sample, topic_name: &str, data_writer: dds_entity_t) {