Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix routing loop of Service request when client and server are server by the same bridge #43

Merged
merged 1 commit into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions zenoh-plugin-ros2dds/src/route_service_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl RouteServiceCli<'_> {
qos,
None,
move |sample| {
do_route_request(
route_dds_request_to_zenoh(
&route_id,
sample,
&zenoh_key_expr2,
Expand Down Expand Up @@ -275,7 +275,7 @@ impl RouteServiceCli<'_> {
}
}

fn do_route_request(
fn route_dds_request_to_zenoh(
route_id: &str,
sample: &DDSRawSample,
zenoh_key_expr: &OwnedKeyExpr,
Expand Down Expand Up @@ -304,10 +304,10 @@ fn do_route_request(
zenoh_req_buf.push_zslice(slice.subslice(20, slice.len()).unwrap());

if *LOG_PAYLOAD {
log::debug!("{route_id}: routing request {request_id:02x?} to Zenoh - payload: {zenoh_req_buf:02x?}");
log::debug!("{route_id}: routing request {request_id:02x?} from DDS to Zenoh - payload: {zenoh_req_buf:02x?}");
} else {
log::trace!(
"{route_id}: routing request {request_id:02x?} to Zenoh - {} bytes",
"{route_id}: routing request {request_id:02x?} from DDS to Zenoh - {} bytes",
zenoh_req_buf.len()
);
}
Expand All @@ -316,21 +316,29 @@ fn do_route_request(
if let Err(e) = zsession
.get(zenoh_key_expr)
.with_value(zenoh_req_buf)
.allowed_destination(Locality::Remote)
.timeout(queries_timeout)
.callback(move |reply| do_route_reply(route_id2.clone(), reply, request_id, rep_writer))
.callback(move |reply| {
route_zenoh_reply_to_dds(route_id2.clone(), reply, request_id, rep_writer)
})
.res_sync()
{
log::warn!("{route_id}: routing request {request_id:02x?} to Zenoh failed: {e}");
log::warn!("{route_id}: routing request {request_id:02x?} from DDS to Zenoh failed: {e}");
}
}

fn do_route_reply(route_id: String, reply: Reply, request_id: [u8; 16], rep_writer: dds_entity_t) {
fn route_zenoh_reply_to_dds(
route_id: String,
reply: Reply,
request_id: [u8; 16],
rep_writer: dds_entity_t,
) {
match reply.sample {
Ok(sample) => {
let zenoh_rep_buf = sample.payload.contiguous();
if zenoh_rep_buf.len() < 4 || zenoh_rep_buf[1] > 1 {
log::warn!(
"{route_id}: received invalid reply for {request_id:02x?}: {zenoh_rep_buf:0x?}"
"{route_id}: received invalid reply from Zenoh for {request_id:02x?}: {zenoh_rep_buf:0x?}"
);
return;
}
Expand All @@ -344,16 +352,18 @@ fn do_route_reply(route_id: String, reply: Reply, request_id: [u8; 16], rep_writ
dds_rep_buf.extend_from_slice(&zenoh_rep_buf[4..]);

if *LOG_PAYLOAD {
log::debug!("{route_id}: routing reply for {request_id:02x?} to Client - payload: {dds_rep_buf:02x?}");
log::debug!("{route_id}: routing reply for {request_id:02x?} from Zenoh to DDS - payload: {dds_rep_buf:02x?}");
} else {
log::trace!(
"{route_id}: routing reply for {request_id:02x?} to Client - {} bytes",
"{route_id}: routing reply for {request_id:02x?} from Zenoh to DDS - {} bytes",
dds_rep_buf.len()
);
}

if let Err(e) = dds_write(rep_writer, dds_rep_buf) {
log::warn!("{route_id}: routing reply for {request_id:02x?} failed: {e}");
log::warn!(
"{route_id}: routing reply for {request_id:02x?} from Zenoh to DDS failed: {e}"
);
}
}
Err(val) => {
Expand Down
26 changes: 14 additions & 12 deletions zenoh-plugin-ros2dds/src/route_service_srv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl RouteServiceSrv<'_> {
let queries_in_progress = queries_in_progress.clone();
let zenoh_key_expr = zenoh_key_expr.clone();
move |sample| {
do_route_reply(
route_dds_reply_to_zenoh(
sample,
zenoh_key_expr.clone(),
&mut zwrite!(queries_in_progress),
Expand Down Expand Up @@ -236,7 +236,7 @@ impl RouteServiceSrv<'_> {
.zsession
.declare_queryable(&self.zenoh_key_expr)
.callback(move |query| {
do_route_request(
route_zenoh_request_to_dds(
query,
&mut zwrite!(queries_in_progress),
&sequence_number,
Expand Down Expand Up @@ -341,7 +341,7 @@ impl RouteServiceSrv<'_> {

const CDR_HEADER_LE: [u8; 4] = [0, 1, 0, 0];

fn do_route_request(
fn route_zenoh_request_to_dds(
query: Query,
queries_in_progress: &mut HashMap<u64, Query>,
sequence_number: &AtomicU64,
Expand Down Expand Up @@ -389,22 +389,24 @@ fn do_route_request(
};

if *LOG_PAYLOAD {
log::debug!("{route_id}: routing request #{n} to Service - payload: {dds_req_buf:02x?}");
log::debug!(
"{route_id}: routing request #{n} from Zenoh to DDS - payload: {dds_req_buf:02x?}"
);
} else {
log::trace!(
"{route_id}: routing request #{n} to Service - {} bytes",
"{route_id}: routing request #{n} from Zenoh to DDS - {} bytes",
dds_req_buf.len()
);
}

queries_in_progress.insert(n, query);
if let Err(e) = dds_write(req_writer, dds_req_buf) {
log::warn!("{route_id}: routing request failed: {e}");
log::warn!("{route_id}: routing request from Zenoh to DDS failed: {e}");
queries_in_progress.remove(&n);
}
}

fn do_route_reply(
fn route_dds_reply_to_zenoh(
sample: &DDSRawSample,
zenoh_key_expr: OwnedKeyExpr,
queries_in_progress: &mut HashMap<u64, Query>,
Expand All @@ -415,7 +417,7 @@ fn do_route_reply(
// the client guid (8 bytes) and a sequence_number (8 bytes). As per rmw_cyclonedds here:
// https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73
if sample.len() < 20 {
log::warn!("{route_id}: received invalid response: {sample:0x?}");
log::warn!("{route_id}: received invalid response from DDS: {sample:0x?}");
return;
}

Expand Down Expand Up @@ -452,10 +454,10 @@ fn do_route_reply(
zenoh_rep_buf.push_zslice(slice.subslice(20, slice.len()).unwrap());

if *LOG_PAYLOAD {
log::debug!("{route_id}: routing reply #{seq_num} to Client - payload: {zenoh_rep_buf:02x?}");
log::debug!("{route_id}: routing reply #{seq_num} from DDS to Zenoh - payload: {zenoh_rep_buf:02x?}");
} else {
log::trace!(
"{route_id}: routing reply #{seq_num} to Client - {} bytes",
"{route_id}: routing reply #{seq_num} from DDS to Zenoh - {} bytes",
zenoh_rep_buf.len()
);
}
Expand All @@ -464,11 +466,11 @@ fn do_route_reply(
.reply(Ok(Sample::new(zenoh_key_expr, zenoh_rep_buf)))
.res_sync()
{
log::warn!("{route_id}: routing reply for request #{seq_num} failed: {e}");
log::warn!("{route_id}: routing reply for request #{seq_num} from DDS to Zenoh failed: {e}");
}
}
None => log::warn!(
"{route_id}: received response an unknown query (already dropped?): #{seq_num}"
"{route_id}: received response from DDS an unknown query (already timed out ?): #{seq_num}"
),
}
}