Skip to content

Commit

Permalink
Support ROS 2 service
Browse files Browse the repository at this point in the history
Signed-off-by: ChenYing Kuo <[email protected]>
  • Loading branch information
evshary committed Dec 11, 2024
1 parent 6c015fc commit 1ab5a83
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ lazy_static = "1.4.0"
regex = "1.7.1"
rustc_version = "0.4"
serde = "1.0.154"
serde_derive = "1.0.154"
serde_json = "1.0.114"
test-case = { version = "3.3.1" }
tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements
Expand Down
1 change: 1 addition & 0 deletions zenoh-plugin-ros2dds/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ hex = { workspace = true }
lazy_static = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
test-case = { workspace = true }
tokio = { workspace = true }
Expand Down
125 changes: 125 additions & 0 deletions zenoh-plugin-ros2dds/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,31 @@ mod ros_test {

use futures::StreamExt;
use r2r::{self, QosProfile};
use serde_derive::{Deserialize, Serialize};
use zenoh::{
config::Config,
internal::{plugins::PluginsManager, runtime::RuntimeBuilder},
Wait,
};
use zenoh_config::ModeDependentValue;

// The test topic
const TEST_TOPIC: &str = "test_topic";
// The test TEST_PAYLOAD
const TEST_PAYLOAD: &str = "Hello World";
// The test service
//const TEST_SERVICE: &str = "test_service";
const TEST_SERVICE: &str = "add_two_ints";

#[derive(Serialize, Deserialize, PartialEq, Clone)]
struct AddTwoIntsRequest {
a: i64,
b: i64,
}
#[derive(Serialize, Deserialize, PartialEq, Clone)]
struct AddTwoIntsReply {
sum: i64,
}

fn init_env() {
std::env::set_var("RMW_IMPLEMENTATION", "rmw_cyclonedds_cpp");
Expand Down Expand Up @@ -135,4 +150,114 @@ mod ros_test {
.await
.expect("Timeout: Zenoh subscriber didn't receive any ROS message.");
}

#[tokio::test(flavor = "multi_thread")]
async fn test_ros_client_zenoh_service() {
init_env();
// Create zenoh-bridge-ros2dds
tokio::spawn(create_bridge());

let a = 1;
let b = 2;

// Zenoh service
let session = zenoh::open(zenoh::Config::default()).await.unwrap();
let _queryable = session
.declare_queryable(TEST_SERVICE)
.callback(|query| {
let request: AddTwoIntsRequest =
cdr::deserialize(&query.payload().unwrap().to_bytes()).unwrap();
let response = AddTwoIntsReply {
sum: request.a + request.b,
};
let data = cdr::serialize::<_, _, cdr::CdrLe>(&response, cdr::Infinite).unwrap();
query.reply(TEST_SERVICE, data).wait().unwrap();
})
.await
.unwrap();

// ROS client
let ctx = r2r::Context::create().unwrap();
let mut node = r2r::Node::create(ctx, "ros_client", "").unwrap();
let client = node
.create_client::<r2r::example_interfaces::srv::AddTwoInts::Service>(
&format!("/{}", TEST_SERVICE),
QosProfile::default(),
)
.unwrap();

// Node spin
use tokio::sync::oneshot;
let (term_tx, mut term_rx) = oneshot::channel();
let _handler = tokio::task::spawn_blocking(move || {
while term_rx.try_recv().is_err() {
node.spin_once(std::time::Duration::from_millis(100));
}
});

// Wait for the environment to be ready
tokio::time::sleep(Duration::from_secs(1)).await;

// Send the request and then process the response
let my_req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b };
let resp = client.request(&my_req).unwrap().await.unwrap();

assert_eq!(resp.sum, a + b);

term_tx.send(()).unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn test_zenoh_client_ros_service() {
init_env();
// Create zenoh-bridge-ros2dds
tokio::spawn(create_bridge());

let a = 1;
let b = 2;

// ROS service
let ctx = r2r::Context::create().unwrap();
let mut node = r2r::Node::create(ctx, "ros_service", "").unwrap();
let mut service = node
.create_service::<r2r::example_interfaces::srv::AddTwoInts::Service>(
&format!("/{}", TEST_SERVICE),
QosProfile::default(),
)
.unwrap();
// Processing the requests and send back responses
tokio::spawn(async move {
loop {
match service.next().await {
Some(req) => {
let resp = r2r::example_interfaces::srv::AddTwoInts::Response {
sum: req.message.a + req.message.b,
};
req.respond(resp).unwrap();
}
None => break,
}
}
});

// Zenoh client
let session = zenoh::open(zenoh::Config::default()).await.unwrap();
let client = session.declare_querier(TEST_SERVICE).await.unwrap();

// Wait for the environment to be ready
tokio::time::sleep(Duration::from_secs(1)).await;

// Send request to ROS service
let req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b };
let buf = cdr::serialize::<_, _, cdr::CdrLe>(&req, cdr::size::Infinite).unwrap();
let recv_handler = client.get().payload(buf).await.unwrap();

// Process the response
node.spin_once(std::time::Duration::from_millis(100));
let reply = recv_handler.recv().unwrap();
let reader = reply.result().unwrap().payload().reader();
let result: Result<i64, _> = cdr::deserialize_from(reader, cdr::size::Infinite);

assert_eq!(result.unwrap(), a + b);
}
}

0 comments on commit 1ab5a83

Please sign in to comment.