From ab5abc8b38f8c9da3e070811126e4cacc71b604f Mon Sep 17 00:00:00 2001 From: Stephen Williams Date: Wed, 20 Mar 2024 19:15:56 -0700 Subject: [PATCH 1/2] Port the 'add reset to batch optimizer' patch from ROS 1 to ROS 2 --- .../fuse_optimizers/batch_optimizer.hpp | 55 +++++++--- .../batch_optimizer_params.hpp | 7 ++ fuse_optimizers/src/batch_optimizer.cpp | 100 ++++++++++++++---- 3 files changed, 132 insertions(+), 30 deletions(-) diff --git a/fuse_optimizers/include/fuse_optimizers/batch_optimizer.hpp b/fuse_optimizers/include/fuse_optimizers/batch_optimizer.hpp index 48e9dc254..31739688c 100644 --- a/fuse_optimizers/include/fuse_optimizers/batch_optimizer.hpp +++ b/fuse_optimizers/include/fuse_optimizers/batch_optimizer.hpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -52,6 +53,9 @@ #include #include +#include +#include + namespace fuse_optimizers { @@ -147,30 +151,49 @@ class BatchOptimizer : public Optimizer */ using TransactionQueue = std::multimap; - fuse_core::Transaction::SharedPtr combined_transaction_; //!< Transaction used aggregate - //!< constraints and variables from - //!< multiple sensors and motions models - //!< before being applied to the graph. - std::mutex combined_transaction_mutex_; //!< Synchronize access to the combined transaction - //!< across different threads + // Read-only after construction ParameterType params_; //!< Configuration settings for this optimizer + std::thread optimization_thread_; //!< Thread used to run the optimizer as a background process + + // Only used inside a single thread + rclcpp::Time start_time_; //!< The timestamp of the first ignition sensor transaction + bool started_; //!< Flag indicating the optimizer is ready/has received a transaction from an + //!< ignition sensor + + // Inherently thread-safe std::atomic optimization_request_; //!< Flag to trigger a new optimization + + // Guarded by optimization_mutex_ + std::mutex optimization_mutex_; //!< Mutex held while the graph is begin optimized + // fuse_core::Graph* graph_ member from the base class + + // Guarded by optimization_requested_mutex_ std::condition_variable optimization_requested_; //!< Condition variable used by the optimization //!< thread to wait until a new optimization is //!< requested by the main thread std::mutex optimization_requested_mutex_; //!< Required condition variable mutex - std::thread optimization_thread_; //!< Thread used to run the optimizer as a background process - rclcpp::TimerBase::SharedPtr optimize_timer_; //!< Trigger an optimization operation at a fixed - //!< frequency + + // Guarded by combined_transaction_mutex_ + fuse_core::Transaction::SharedPtr combined_transaction_; //!< Transaction used aggregate + //!< constraints and variables from + //!< multiple sensors and motions models + //!< before being applied to the graph. + std::mutex combined_transaction_mutex_; //!< Synchronize access to the combined transaction + //!< across different threads + + // Guarded by pending_transactions_mutex_ TransactionQueue pending_transactions_; //!< The set of received transactions that have not been //!< added to the optimizer yet. Transactions are added //!< by the main thread, and removed and processed by the //!< optimization thread. std::mutex pending_transactions_mutex_; //!< Synchronize modification of the //!< pending_transactions_ container - rclcpp::Time start_time_; //!< The timestamp of the first ignition sensor transaction - bool started_; //!< Flag indicating the optimizer is ready/has received a transaction from an - //!< ignition sensor + + // Ordering ROS objects with callbacks last + rclcpp::TimerBase::SharedPtr optimize_timer_; //!< Trigger an optimization operation at a fixed + //!< frequency + //!< Service that resets the optimizer to its initial state + rclcpp::Service::SharedPtr reset_service_server_; /** * @brief Generate motion model constraints for pending transactions @@ -200,6 +223,14 @@ class BatchOptimizer : public Optimizer */ void optimizerTimerCallback(); + /** + * @brief Service callback that resets the optimizer to its original state + */ + bool resetServiceCallback( + const std::shared_ptr, + std::shared_ptr + ); + /** * @brief Callback fired every time the SensorModel plugin creates a new transaction * diff --git a/fuse_optimizers/include/fuse_optimizers/batch_optimizer_params.hpp b/fuse_optimizers/include/fuse_optimizers/batch_optimizer_params.hpp index ba4011db3..85308a20f 100644 --- a/fuse_optimizers/include/fuse_optimizers/batch_optimizer_params.hpp +++ b/fuse_optimizers/include/fuse_optimizers/batch_optimizer_params.hpp @@ -65,6 +65,11 @@ struct BatchOptimizerParams */ rclcpp::Duration optimization_period {0, static_cast(RCUTILS_S_TO_NS(0.1))}; + /** + * @brief The topic name of the advertised reset service + */ + std::string reset_service {"~reset"}; + /** * @brief The maximum time to wait for motion models to be generated for a received transaction. * @@ -109,6 +114,8 @@ struct BatchOptimizerParams rclcpp::Duration::from_seconds(1.0 / optimization_frequency); } + reset_service = fuse_core::getParam(interfaces, "reset_service", reset_service); + fuse_core::getPositiveParam(interfaces, "transaction_timeout", transaction_timeout); fuse_core::loadSolverOptionsFromROS(interfaces, solver_options, "solver_options"); diff --git a/fuse_optimizers/src/batch_optimizer.cpp b/fuse_optimizers/src/batch_optimizer.cpp index a4b520706..643d9826d 100644 --- a/fuse_optimizers/src/batch_optimizer.cpp +++ b/fuse_optimizers/src/batch_optimizer.cpp @@ -39,6 +39,7 @@ #include #include +#include #include #include #include @@ -52,10 +53,10 @@ BatchOptimizer::BatchOptimizer( fuse_core::Graph::UniquePtr graph ) : fuse_optimizers::Optimizer(interfaces, std::move(graph)), - combined_transaction_(fuse_core::Transaction::make_shared()), - optimization_request_(false), start_time_(rclcpp::Time::max()), - started_(false) + started_(false), + optimization_request_(false), + combined_transaction_(fuse_core::Transaction::make_shared()) { params_.loadFromROS(interfaces_); @@ -68,6 +69,23 @@ BatchOptimizer::BatchOptimizer( interfaces_.get_node_base_interface()->get_default_callback_group() ); + // Advertise a service that resets the optimizer to its initial state + reset_service_server_ = rclcpp::create_service( + interfaces_.get_node_base_interface(), + interfaces_.get_node_services_interface(), + fuse_core::joinTopicName( + interfaces_.get_node_base_interface()->get_name(), + params_.reset_service), + std::bind( + &BatchOptimizer::resetServiceCallback, + this, + std::placeholders::_1, + std::placeholders::_2 + ), + rclcpp::ServicesQoS(), + interfaces_.get_node_base_interface()->get_default_callback_group() + ); + // Start the optimization thread optimization_thread_ = std::thread(&BatchOptimizer::optimizationLoop, this); } @@ -148,23 +166,27 @@ void BatchOptimizer::optimizationLoop() if (!interfaces_.get_node_base_interface()->get_context()->is_valid()) { break; } - // Copy the combined transaction so it can be shared with all the plugins - fuse_core::Transaction::ConstSharedPtr const_transaction; + { - std::lock_guard lock(combined_transaction_mutex_); - const_transaction = std::move(combined_transaction_); - combined_transaction_ = fuse_core::Transaction::make_shared(); + std::lock_guard lock(optimization_mutex_); + // Copy the combined transaction so it can be shared with all the plugins + fuse_core::Transaction::ConstSharedPtr const_transaction; + { + std::lock_guard lock(combined_transaction_mutex_); + const_transaction = std::move(combined_transaction_); + combined_transaction_ = fuse_core::Transaction::make_shared(); + } + // Update the graph + graph_->update(*const_transaction); + // Optimize the entire graph + graph_->optimize(params_.solver_options); + // Make a copy of the graph to share + fuse_core::Graph::ConstSharedPtr const_graph = graph_->clone(); + // Optimization is complete. Notify all the things about the graph changes. + notify(const_transaction, const_graph); + // Clear the request flag now that this optimization cycle is complete + optimization_request_ = false; } - // Update the graph - graph_->update(*const_transaction); - // Optimize the entire graph - graph_->optimize(params_.solver_options); - // Make a copy of the graph to share - fuse_core::Graph::ConstSharedPtr const_graph = graph_->clone(); - // Optimization is complete. Notify all the things about the graph changes. - notify(const_transaction, const_graph); - // Clear the request flag now that this optimization cycle is complete - optimization_request_ = false; } } @@ -189,6 +211,48 @@ void BatchOptimizer::optimizerTimerCallback() } } +bool BatchOptimizer::resetServiceCallback( + const std::shared_ptr, + std::shared_ptr +) +{ + // Tell all the plugins to stop + stopPlugins(); + // Reset the optimizer state + { + std::lock_guard lock(optimization_requested_mutex_); + optimization_request_ = false; + } + started_ = false; + // DANGER: The optimizationLoop() function obtains the lock optimization_mutex_ lock and the + // combined_transaction_mutex_ lock at the same time. We perform a parallel locking scheme + // here to prevent the possibility of deadlocks. + { + std::lock_guard lock(optimization_mutex_); + // Clear the combined transation + { + std::lock_guard lock(combined_transaction_mutex_); + combined_transaction_ = fuse_core::Transaction::make_shared(); + } + // Clear the graph and marginal tracking states + graph_->clear(); + } + // Clear all pending transactions + // The transaction callback and the optimization timer callback are the only other locations + // where the pending_transactions_ variable is modified. As long as the BatchOptimizer node + // handle is single-threaded, then pending_transactions_ variable cannot be modified while the + // reset callback is running. Therefore, there are no timing or sequence issues with exactly + // where inside the reset service callback the pending_transactions_ are cleared. + { + std::lock_guard lock(pending_transactions_mutex_); + pending_transactions_.clear(); + } + // Tell all the plugins to start + startPlugins(); + + return true; +} + void BatchOptimizer::transactionCallback( const std::string & sensor_name, fuse_core::Transaction::SharedPtr transaction) From 65d0f1471b55715833a96d43d92cf554f69bbcae Mon Sep 17 00:00:00 2001 From: Stephen Williams Date: Wed, 20 Mar 2024 19:16:37 -0700 Subject: [PATCH 2/2] A quick hack to get an unrelated unit test to pass --- .../test/launch_tests/test_fixed_lag_ignition.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/fuse_optimizers/test/launch_tests/test_fixed_lag_ignition.cpp b/fuse_optimizers/test/launch_tests/test_fixed_lag_ignition.cpp index 50ff48f0b..9983f8341 100644 --- a/fuse_optimizers/test/launch_tests/test_fixed_lag_ignition.cpp +++ b/fuse_optimizers/test/launch_tests/test_fixed_lag_ignition.cpp @@ -131,6 +131,11 @@ TEST_F(FixedLagIgnitionFixture, SetInitialState) pose_msg1.pose.covariance[35] = 1.0; relative_pose_publisher->publish(pose_msg1); + /// @todo(swilliams) There is a timing or message queuing issue. This sleep "solves" + /// the problem on my local machine, but this isn't a very robust + /// solution. + rclcpp::sleep_for(std::chrono::milliseconds(10)); + auto pose_msg2 = geometry_msgs::msg::PoseWithCovarianceStamped(); pose_msg2.header.stamp = rclcpp::Time(3, 0, RCL_ROS_TIME); pose_msg2.header.frame_id = "base_link"; @@ -146,6 +151,8 @@ TEST_F(FixedLagIgnitionFixture, SetInitialState) pose_msg2.pose.covariance[35] = 1.0; relative_pose_publisher->publish(pose_msg2); + rclcpp::sleep_for(std::chrono::milliseconds(10)); + // Wait for the optimizer to process all queued transactions rclcpp::Time result_timeout = node->now() + rclcpp::Duration::from_seconds(1.0); auto odom_msg = nav_msgs::msg::Odometry();