diff --git a/Cargo.lock b/Cargo.lock index 8207895..361b4c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4966,6 +4966,7 @@ dependencies = [ "regex", "rustc_version 0.4.1", "serde", + "serde_derive", "serde_json", "test-case", "tokio", diff --git a/Cargo.toml b/Cargo.toml index c88085c..e86b4fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ lazy_static = "1.4.0" regex = "1.7.1" rustc_version = "0.4" serde = "1.0.154" +serde_derive = "1.0.154" serde_json = "1.0.114" test-case = { version = "3.3.1" } tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements diff --git a/zenoh-plugin-ros2dds/Cargo.toml b/zenoh-plugin-ros2dds/Cargo.toml index 7d12da1..b71582a 100644 --- a/zenoh-plugin-ros2dds/Cargo.toml +++ b/zenoh-plugin-ros2dds/Cargo.toml @@ -46,6 +46,7 @@ hex = { workspace = true } lazy_static = { workspace = true } regex = { workspace = true } serde = { workspace = true } +serde_derive = { workspace = true } serde_json = { workspace = true } test-case = { workspace = true } tokio = { workspace = true } diff --git a/zenoh-plugin-ros2dds/tests/test.rs b/zenoh-plugin-ros2dds/tests/test.rs index 28edbd0..37bd321 100644 --- a/zenoh-plugin-ros2dds/tests/test.rs +++ b/zenoh-plugin-ros2dds/tests/test.rs @@ -18,9 +18,11 @@ mod ros_test { use futures::StreamExt; use r2r::{self, QosProfile}; + use serde_derive::{Deserialize, Serialize}; use zenoh::{ config::Config, internal::{plugins::PluginsManager, runtime::RuntimeBuilder}, + Wait, }; use zenoh_config::ModeDependentValue; @@ -28,6 +30,19 @@ mod ros_test { const TEST_TOPIC: &str = "test_topic"; // The test TEST_PAYLOAD const TEST_PAYLOAD: &str = "Hello World"; + // The test service + //const TEST_SERVICE: &str = "test_service"; + const TEST_SERVICE: &str = "add_two_ints"; + + #[derive(Serialize, Deserialize, PartialEq, Clone)] + struct AddTwoIntsRequest { + a: i64, + b: i64, + } + #[derive(Serialize, Deserialize, PartialEq, Clone)] + struct AddTwoIntsReply { + sum: i64, + } fn init_env() { std::env::set_var("RMW_IMPLEMENTATION", "rmw_cyclonedds_cpp"); @@ -135,4 +150,114 @@ mod ros_test { .await .expect("Timeout: Zenoh subscriber didn't receive any ROS message."); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_ros_client_zenoh_service() { + init_env(); + // Create zenoh-bridge-ros2dds + tokio::spawn(create_bridge()); + + let a = 1; + let b = 2; + + // Zenoh service + let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + let _queryable = session + .declare_queryable(TEST_SERVICE) + .callback(|query| { + let request: AddTwoIntsRequest = + cdr::deserialize(&query.payload().unwrap().to_bytes()).unwrap(); + let response = AddTwoIntsReply { + sum: request.a + request.b, + }; + let data = cdr::serialize::<_, _, cdr::CdrLe>(&response, cdr::Infinite).unwrap(); + query.reply(TEST_SERVICE, data).wait().unwrap(); + }) + .await + .unwrap(); + + // ROS client + let ctx = r2r::Context::create().unwrap(); + let mut node = r2r::Node::create(ctx, "ros_client", "").unwrap(); + let client = node + .create_client::( + &format!("/{}", TEST_SERVICE), + QosProfile::default(), + ) + .unwrap(); + + // Node spin + use tokio::sync::oneshot; + let (term_tx, mut term_rx) = oneshot::channel(); + let _handler = tokio::task::spawn_blocking(move || { + while term_rx.try_recv().is_err() { + node.spin_once(std::time::Duration::from_millis(100)); + } + }); + + // Wait for the environment to be ready + tokio::time::sleep(Duration::from_secs(1)).await; + + // Send the request and then process the response + let my_req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b }; + let resp = client.request(&my_req).unwrap().await.unwrap(); + + assert_eq!(resp.sum, a + b); + + term_tx.send(()).unwrap(); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_zenoh_client_ros_service() { + init_env(); + // Create zenoh-bridge-ros2dds + tokio::spawn(create_bridge()); + + let a = 1; + let b = 2; + + // ROS service + let ctx = r2r::Context::create().unwrap(); + let mut node = r2r::Node::create(ctx, "ros_service", "").unwrap(); + let mut service = node + .create_service::( + &format!("/{}", TEST_SERVICE), + QosProfile::default(), + ) + .unwrap(); + // Processing the requests and send back responses + tokio::spawn(async move { + loop { + match service.next().await { + Some(req) => { + let resp = r2r::example_interfaces::srv::AddTwoInts::Response { + sum: req.message.a + req.message.b, + }; + req.respond(resp).unwrap(); + } + None => break, + } + } + }); + + // Zenoh client + let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + let client = session.declare_querier(TEST_SERVICE).await.unwrap(); + + // Wait for the environment to be ready + tokio::time::sleep(Duration::from_secs(1)).await; + + // Send request to ROS service + let req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b }; + let buf = cdr::serialize::<_, _, cdr::CdrLe>(&req, cdr::size::Infinite).unwrap(); + let recv_handler = client.get().payload(buf).await.unwrap(); + + // Process the response + node.spin_once(std::time::Duration::from_millis(100)); + let reply = recv_handler.recv().unwrap(); + let reader = reply.result().unwrap().payload().reader(); + let result: Result = cdr::deserialize_from(reader, cdr::size::Infinite); + + assert_eq!(result.unwrap(), a + b); + } }