Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix WaitSet issue in tests (backport #1206) #1211

Merged
merged 3 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ TEST_F(AckermannSteeringControllerTest, receive_message_and_publish_updated_stat
EXPECT_EQ(msg.steering_angle_command[1], 4.4);

publish_commands();
ASSERT_TRUE(controller_->wait_for_commands(executor));
controller_->wait_for_commands(executor);

ASSERT_EQ(
controller_->update(rclcpp::Time(0, 0, RCL_ROS_TIME), rclcpp::Duration::from_seconds(0.01)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,7 @@ class TestableAckermannSteeringController
controller_interface::CallbackReturn on_configure(
const rclcpp_lifecycle::State & previous_state) override
{
auto ret =
ackermann_steering_controller::AckermannSteeringController::on_configure(previous_state);
// Only if on_configure is successful create subscription
if (ret == CallbackReturn::SUCCESS)
{
ref_subscriber_wait_set_.add_subscription(ref_subscriber_twist_);
}
return ret;
return ackermann_steering_controller::AckermannSteeringController::on_configure(previous_state);
}

controller_interface::CallbackReturn on_activate(
Expand All @@ -97,30 +90,25 @@ class TestableAckermannSteeringController
* @brief wait_for_command blocks until a new ControllerReferenceMsg is received.
* Requires that the executor is not spinned elsewhere between the
* message publication and the call to this function.
*
* @return true if new ControllerReferenceMsg msg was received, false if timeout.
*/
bool wait_for_command(
rclcpp::Executor & executor, rclcpp::WaitSet & subscriber_wait_set,
void wait_for_command(
rclcpp::Executor & executor,
const std::chrono::milliseconds & timeout = std::chrono::milliseconds{500})
{
bool success = subscriber_wait_set.wait(timeout).kind() == rclcpp::WaitResultKind::Ready;
if (success)
auto until = get_node()->get_clock()->now() + timeout;
while (get_node()->get_clock()->now() < until)
{
executor.spin_some();
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
return success;
}

bool wait_for_commands(
void wait_for_commands(
rclcpp::Executor & executor,
const std::chrono::milliseconds & timeout = std::chrono::milliseconds{500})
{
return wait_for_command(executor, ref_subscriber_wait_set_, timeout);
wait_for_command(executor, timeout);
}

private:
rclcpp::WaitSet ref_subscriber_wait_set_;
};

// We are using template class here for easier reuse of Fixture in specializations of controllers
Expand Down Expand Up @@ -212,36 +200,43 @@ class AckermannSteeringControllerFixture : public ::testing::Test
void subscribe_and_get_messages(ControllerStateMsg & msg)
{
// create a new subscriber
ControllerStateMsg::SharedPtr received_msg;
rclcpp::Node test_subscription_node("test_subscription_node");
auto subs_callback = [&](const ControllerStateMsg::SharedPtr) {};
auto subs_callback = [&](const ControllerStateMsg::SharedPtr cb_msg) { received_msg = cb_msg; };
auto subscription = test_subscription_node.create_subscription<ControllerStateMsg>(
"/test_ackermann_steering_controller/controller_state", 10, subs_callback);
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(test_subscription_node.get_node_base_interface());

// call update to publish the test value
ASSERT_EQ(
controller_->update(rclcpp::Time(0, 0, RCL_ROS_TIME), rclcpp::Duration::from_seconds(0.01)),
controller_interface::return_type::OK);

// call update to publish the test value
// since update doesn't guarantee a published message, republish until received
int max_sub_check_loop_count = 5; // max number of tries for pub/sub loop
rclcpp::WaitSet wait_set; // block used to wait on message
wait_set.add_subscription(subscription);
while (max_sub_check_loop_count--)
{
controller_->update(rclcpp::Time(0, 0, RCL_ROS_TIME), rclcpp::Duration::from_seconds(0.01));
const auto timeout = std::chrono::milliseconds{1};
const auto until = test_subscription_node.get_clock()->now() + timeout;
while (!received_msg && test_subscription_node.get_clock()->now() < until)
{
executor.spin_some();
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
// check if message has been received
if (wait_set.wait(std::chrono::milliseconds(2)).kind() == rclcpp::WaitResultKind::Ready)
if (received_msg.get())
{
break;
}
}
ASSERT_GE(max_sub_check_loop_count, 0) << "Test was unable to publish a message through "
"controller/broadcaster update loop";
ASSERT_TRUE(received_msg);

// take message from subscription
rclcpp::MessageInfo msg_info;
ASSERT_TRUE(subscription->take(msg, msg_info));
msg = *received_msg;
}

void publish_commands(const double linear = 0.1, const double angular = 0.2)
Expand Down
2 changes: 1 addition & 1 deletion admittance_controller/test/test_admittance_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ TEST_F(AdmittanceControllerTest, receive_message_and_publish_updated_status)
// ASSERT_EQ(msg.wrench_base.header.frame_id, ik_base_frame_);

publish_commands();
ASSERT_TRUE(controller_->wait_for_commands(executor));
controller_->wait_for_commands(executor);

broadcast_tfs();
ASSERT_EQ(
Expand Down
49 changes: 19 additions & 30 deletions admittance_controller/test/test_admittance_controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,6 @@ constexpr auto NODE_SUCCESS =
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS;
constexpr auto NODE_ERROR =
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::ERROR;

rclcpp::WaitResultKind wait_for(rclcpp::SubscriptionBase::SharedPtr subscription)
{
rclcpp::WaitSet wait_set;
wait_set.add_subscription(subscription);
const auto timeout = std::chrono::seconds(10);
return wait_set.wait(timeout).kind();
}

} // namespace

// subclassing and friending so we can access member variables
Expand All @@ -90,39 +81,27 @@ class TestableAdmittanceController : public admittance_controller::AdmittanceCon

CallbackReturn on_configure(const rclcpp_lifecycle::State & previous_state) override
{
auto ret = admittance_controller::AdmittanceController::on_configure(previous_state);
// Only if on_configure is successful create subscription
if (ret == CallbackReturn::SUCCESS)
{
input_pose_command_subscriber_wait_set_.add_subscription(input_joint_command_subscriber_);
}
return ret;
return admittance_controller::AdmittanceController::on_configure(previous_state);
}

/**
* @brief wait_for_commands blocks until a new ControllerCommandMsg is received.
* Requires that the executor is not spinned elsewhere between the
* message publication and the call to this function.
*
* @return true if new ControllerCommandMsg msg was received, false if timeout.
*/
bool wait_for_commands(
void wait_for_commands(
rclcpp::Executor & executor,
const std::chrono::milliseconds & timeout = std::chrono::milliseconds{500})
{
bool success =
input_pose_command_subscriber_wait_set_.wait(timeout).kind() == rclcpp::WaitResultKind::Ready;

if (success)
auto until = get_node()->get_clock()->now() + timeout;
while (get_node()->get_clock()->now() < until)
{
executor.spin_some();
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
return success;
}

private:
rclcpp::WaitSet input_wrench_command_subscriber_wait_set_;
rclcpp::WaitSet input_pose_command_subscriber_wait_set_;
const std::string robot_description_ = ros2_control_test_assets::valid_6d_robot_urdf;
const std::string robot_description_semantic_ = ros2_control_test_assets::valid_6d_robot_srdf;
};
Expand Down Expand Up @@ -277,21 +256,31 @@ class AdmittanceControllerTest : public ::testing::Test
void subscribe_and_get_messages(ControllerStateMsg & msg)
{
// create a new subscriber
auto subs_callback = [&](const ControllerStateMsg::SharedPtr) {};
ControllerStateMsg::SharedPtr received_msg;
auto subs_callback = [&](const ControllerStateMsg::SharedPtr cb_msg) { received_msg = cb_msg; };
auto subscription = test_subscription_node_->create_subscription<ControllerStateMsg>(
"/test_admittance_controller/status", 10, subs_callback);
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(test_subscription_node_->get_node_base_interface());

// call update to publish the test value
ASSERT_EQ(
controller_->update(rclcpp::Time(0), rclcpp::Duration::from_seconds(0.01)),
controller_interface::return_type::OK);

// wait for message to be passed
ASSERT_EQ(wait_for(subscription), rclcpp::WaitResultKind::Ready);
const auto timeout = std::chrono::milliseconds{1};
const auto until = test_subscription_node_->get_clock()->now() + timeout;
while (!received_msg && test_subscription_node_->get_clock()->now() < until)
{
executor.spin_some();
std::this_thread::sleep_for(std::chrono::microseconds(10));
}

ASSERT_TRUE(received_msg);

// take message from subscription
rclcpp::MessageInfo msg_info;
ASSERT_TRUE(subscription->take(msg, msg_info));
msg = *received_msg;
}

void publish_commands()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ TEST_F(BicycleSteeringControllerTest, receive_message_and_publish_updated_status
EXPECT_EQ(msg.steering_angle_command[0], 2.2);

publish_commands(0.1, 0.2);
ASSERT_TRUE(controller_->wait_for_commands(executor));
controller_->wait_for_commands(executor);

ASSERT_EQ(
controller_->update(rclcpp::Time(0, 0, RCL_ROS_TIME), rclcpp::Duration::from_seconds(0.01)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,7 @@ class TestableBicycleSteeringController
controller_interface::CallbackReturn on_configure(
const rclcpp_lifecycle::State & previous_state) override
{
auto ret = bicycle_steering_controller::BicycleSteeringController::on_configure(previous_state);
// Only if on_configure is successful create subscription
if (ret == CallbackReturn::SUCCESS)
{
ref_subscriber_wait_set_.add_subscription(ref_subscriber_twist_);
}
return ret;
return bicycle_steering_controller::BicycleSteeringController::on_configure(previous_state);
}

controller_interface::CallbackReturn on_activate(
Expand All @@ -94,30 +88,25 @@ class TestableBicycleSteeringController
* @brief wait_for_command blocks until a new ControllerReferenceMsg is received.
* Requires that the executor is not spinned elsewhere between the
* message publication and the call to this function.
*
* @return true if new ControllerReferenceMsg msg was received, false if timeout.
*/
bool wait_for_command(
rclcpp::Executor & executor, rclcpp::WaitSet & subscriber_wait_set,
void wait_for_command(
rclcpp::Executor & executor,
const std::chrono::milliseconds & timeout = std::chrono::milliseconds{500})
{
bool success = subscriber_wait_set.wait(timeout).kind() == rclcpp::WaitResultKind::Ready;
if (success)
auto until = get_node()->get_clock()->now() + timeout;
while (get_node()->get_clock()->now() < until)
{
executor.spin_some();
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
return success;
}

bool wait_for_commands(
void wait_for_commands(
rclcpp::Executor & executor,
const std::chrono::milliseconds & timeout = std::chrono::milliseconds{500})
{
return wait_for_command(executor, ref_subscriber_wait_set_, timeout);
wait_for_command(executor, timeout);
}

private:
rclcpp::WaitSet ref_subscriber_wait_set_;
};

// We are using template class here for easier reuse of Fixture in specializations of controllers
Expand Down Expand Up @@ -185,34 +174,43 @@ class BicycleSteeringControllerFixture : public ::testing::Test
void subscribe_and_get_messages(ControllerStateMsg & msg)
{
// create a new subscriber
ControllerStateMsg::SharedPtr received_msg;
rclcpp::Node test_subscription_node("test_subscription_node");
auto subs_callback = [&](const ControllerStateMsg::SharedPtr) {};
auto subs_callback = [&](const ControllerStateMsg::SharedPtr cb_msg) { received_msg = cb_msg; };
auto subscription = test_subscription_node.create_subscription<ControllerStateMsg>(
"/test_bicycle_steering_controller/controller_state", 10, subs_callback);
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(test_subscription_node.get_node_base_interface());

// call update to publish the test value
ASSERT_EQ(
controller_->update(rclcpp::Time(0, 0, RCL_ROS_TIME), rclcpp::Duration::from_seconds(0.01)),
controller_interface::return_type::OK);
// call update to publish the test value
// since update doesn't guarantee a published message, republish until received
int max_sub_check_loop_count = 5; // max number of tries for pub/sub loop
rclcpp::WaitSet wait_set; // block used to wait on max_sub_check_loop_count
wait_set.add_subscription(subscription);
while (max_sub_check_loop_count--)
{
controller_->update(rclcpp::Time(0, 0, RCL_ROS_TIME), rclcpp::Duration::from_seconds(0.01));
const auto timeout = std::chrono::milliseconds{1};
const auto until = test_subscription_node.get_clock()->now() + timeout;
while (!received_msg && test_subscription_node.get_clock()->now() < until)
{
executor.spin_some();
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
// check if message has been received
if (wait_set.wait(std::chrono::milliseconds(2)).kind() == rclcpp::WaitResultKind::Ready)
if (received_msg.get())
{
break;
}
}
ASSERT_GE(max_sub_check_loop_count, 0) << "Test was unable to publish a message through "
"controller/broadcaster update loop";
ASSERT_TRUE(received_msg);

// take message from subscription
rclcpp::MessageInfo msg_info;
ASSERT_TRUE(subscription->take(msg, msg_info));
msg = *received_msg;
}

void publish_commands(const double linear = 0.1, const double angular = 0.2)
Expand Down
15 changes: 5 additions & 10 deletions diff_drive_controller/test/test_diff_drive_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,17 @@ class TestableDiffDriveController : public diff_drive_controller::DiffDriveContr
* @brief wait_for_twist block until a new twist is received.
* Requires that the executor is not spinned elsewhere between the
* message publication and the call to this function
*
* @return true if new twist msg was received, false if timeout
*/
bool wait_for_twist(
void wait_for_twist(
rclcpp::Executor & executor,
const std::chrono::milliseconds & timeout = std::chrono::milliseconds(500))
{
rclcpp::WaitSet wait_set;
wait_set.add_subscription(velocity_command_subscriber_);

if (wait_set.wait(timeout).kind() == rclcpp::WaitResultKind::Ready)
auto until = get_node()->get_clock()->now() + timeout;
while (get_node()->get_clock()->now() < until)
{
executor.spin_some();
return true;
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
return false;
}

/**
Expand Down Expand Up @@ -547,7 +542,7 @@ TEST_F(TestDiffDriveController, correct_initialization_using_parameters)
const double angular = 0.0;
publish(linear, angular);
// wait for msg is be published to the system
ASSERT_TRUE(controller_->wait_for_twist(executor));
controller_->wait_for_twist(executor);

ASSERT_EQ(
controller_->update(rclcpp::Time(0, 0, RCL_ROS_TIME), rclcpp::Duration::from_seconds(0.01)),
Expand Down
Loading
Loading