Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port the "Add reset to batch optimizer" patch to ROS 2 Rolling #361

Open
wants to merge 2 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 43 additions & 12 deletions fuse_optimizers/include/fuse_optimizers/batch_optimizer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <atomic>
#include <condition_variable>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
Expand All @@ -52,6 +53,9 @@
#include <fuse_optimizers/optimizer.hpp>
#include <fuse_graphs/hash_graph.hpp>

#include <rclcpp/rclcpp.hpp>
#include <std_srvs/srv/empty.hpp>


namespace fuse_optimizers
{
Expand Down Expand Up @@ -147,30 +151,49 @@ class BatchOptimizer : public Optimizer
*/
using TransactionQueue = std::multimap<rclcpp::Time, TransactionQueueElement>;

fuse_core::Transaction::SharedPtr combined_transaction_; //!< Transaction used aggregate
//!< constraints and variables from
//!< multiple sensors and motions models
//!< before being applied to the graph.
std::mutex combined_transaction_mutex_; //!< Synchronize access to the combined transaction
//!< across different threads
// Read-only after construction
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm doing a bit of variable rearranging. This is not strictly required for this patch, but I wanted to mirror the FixedLagSmoother pattern in the BatchOptimizer.

ParameterType params_; //!< Configuration settings for this optimizer
std::thread optimization_thread_; //!< Thread used to run the optimizer as a background process

// Only used inside a single thread
rclcpp::Time start_time_; //!< The timestamp of the first ignition sensor transaction
bool started_; //!< Flag indicating the optimizer is ready/has received a transaction from an
//!< ignition sensor

// Inherently thread-safe
std::atomic<bool> optimization_request_; //!< Flag to trigger a new optimization

// Guarded by optimization_mutex_
std::mutex optimization_mutex_; //!< Mutex held while the graph is begin optimized
// fuse_core::Graph* graph_ member from the base class

// Guarded by optimization_requested_mutex_
std::condition_variable optimization_requested_; //!< Condition variable used by the optimization
//!< thread to wait until a new optimization is
//!< requested by the main thread
std::mutex optimization_requested_mutex_; //!< Required condition variable mutex
std::thread optimization_thread_; //!< Thread used to run the optimizer as a background process
rclcpp::TimerBase::SharedPtr optimize_timer_; //!< Trigger an optimization operation at a fixed
//!< frequency

// Guarded by combined_transaction_mutex_
fuse_core::Transaction::SharedPtr combined_transaction_; //!< Transaction used aggregate
//!< constraints and variables from
//!< multiple sensors and motions models
//!< before being applied to the graph.
std::mutex combined_transaction_mutex_; //!< Synchronize access to the combined transaction
//!< across different threads

// Guarded by pending_transactions_mutex_
TransactionQueue pending_transactions_; //!< The set of received transactions that have not been
//!< added to the optimizer yet. Transactions are added
//!< by the main thread, and removed and processed by the
//!< optimization thread.
std::mutex pending_transactions_mutex_; //!< Synchronize modification of the
//!< pending_transactions_ container
rclcpp::Time start_time_; //!< The timestamp of the first ignition sensor transaction
bool started_; //!< Flag indicating the optimizer is ready/has received a transaction from an
//!< ignition sensor

// Ordering ROS objects with callbacks last
rclcpp::TimerBase::SharedPtr optimize_timer_; //!< Trigger an optimization operation at a fixed
//!< frequency
//!< Service that resets the optimizer to its initial state
rclcpp::Service<std_srvs::srv::Empty>::SharedPtr reset_service_server_;

/**
* @brief Generate motion model constraints for pending transactions
Expand Down Expand Up @@ -200,6 +223,14 @@ class BatchOptimizer : public Optimizer
*/
void optimizerTimerCallback();

/**
* @brief Service callback that resets the optimizer to its original state
*/
bool resetServiceCallback(
const std::shared_ptr<std_srvs::srv::Empty::Request>,
std::shared_ptr<std_srvs::srv::Empty::Response>
);

/**
* @brief Callback fired every time the SensorModel plugin creates a new transaction
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ struct BatchOptimizerParams
*/
rclcpp::Duration optimization_period {0, static_cast<uint32_t>(RCUTILS_S_TO_NS(0.1))};

/**
* @brief The topic name of the advertised reset service
*/
std::string reset_service {"~reset"};

/**
* @brief The maximum time to wait for motion models to be generated for a received transaction.
*
Expand Down Expand Up @@ -109,6 +114,8 @@ struct BatchOptimizerParams
rclcpp::Duration::from_seconds(1.0 / optimization_frequency);
}

reset_service = fuse_core::getParam(interfaces, "reset_service", reset_service);

fuse_core::getPositiveParam(interfaces, "transaction_timeout", transaction_timeout);

fuse_core::loadSolverOptionsFromROS(interfaces, solver_options, "solver_options");
Expand Down
100 changes: 82 additions & 18 deletions fuse_optimizers/src/batch_optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <utility>

#include <fuse_core/transaction.hpp>
#include <fuse_core/util.hpp>
#include <fuse_optimizers/batch_optimizer.hpp>
#include <fuse_optimizers/optimizer.hpp>
#include <rclcpp/rclcpp.hpp>
Expand All @@ -52,10 +53,10 @@ BatchOptimizer::BatchOptimizer(
fuse_core::Graph::UniquePtr graph
)
: fuse_optimizers::Optimizer(interfaces, std::move(graph)),
combined_transaction_(fuse_core::Transaction::make_shared()),
optimization_request_(false),
start_time_(rclcpp::Time::max()),
started_(false)
started_(false),
optimization_request_(false),
combined_transaction_(fuse_core::Transaction::make_shared())
{
params_.loadFromROS(interfaces_);

Expand All @@ -68,6 +69,23 @@ BatchOptimizer::BatchOptimizer(
interfaces_.get_node_base_interface()->get_default_callback_group()
);

// Advertise a service that resets the optimizer to its initial state
reset_service_server_ = rclcpp::create_service<std_srvs::srv::Empty>(
interfaces_.get_node_base_interface(),
interfaces_.get_node_services_interface(),
fuse_core::joinTopicName(
interfaces_.get_node_base_interface()->get_name(),
params_.reset_service),
std::bind(
&BatchOptimizer::resetServiceCallback,
this,
std::placeholders::_1,
std::placeholders::_2
),
rclcpp::ServicesQoS(),
interfaces_.get_node_base_interface()->get_default_callback_group()
);

// Start the optimization thread
optimization_thread_ = std::thread(&BatchOptimizer::optimizationLoop, this);
}
Expand Down Expand Up @@ -148,23 +166,27 @@ void BatchOptimizer::optimizationLoop()
if (!interfaces_.get_node_base_interface()->get_context()->is_valid()) {
break;
}
// Copy the combined transaction so it can be shared with all the plugins
fuse_core::Transaction::ConstSharedPtr const_transaction;

{
std::lock_guard<std::mutex> lock(combined_transaction_mutex_);
const_transaction = std::move(combined_transaction_);
combined_transaction_ = fuse_core::Transaction::make_shared();
std::lock_guard<std::mutex> lock(optimization_mutex_);
// Copy the combined transaction so it can be shared with all the plugins
fuse_core::Transaction::ConstSharedPtr const_transaction;
{
std::lock_guard<std::mutex> lock(combined_transaction_mutex_);
const_transaction = std::move(combined_transaction_);
combined_transaction_ = fuse_core::Transaction::make_shared();
}
// Update the graph
graph_->update(*const_transaction);
// Optimize the entire graph
graph_->optimize(params_.solver_options);
// Make a copy of the graph to share
fuse_core::Graph::ConstSharedPtr const_graph = graph_->clone();
// Optimization is complete. Notify all the things about the graph changes.
notify(const_transaction, const_graph);
// Clear the request flag now that this optimization cycle is complete
optimization_request_ = false;
}
// Update the graph
graph_->update(*const_transaction);
// Optimize the entire graph
graph_->optimize(params_.solver_options);
// Make a copy of the graph to share
fuse_core::Graph::ConstSharedPtr const_graph = graph_->clone();
// Optimization is complete. Notify all the things about the graph changes.
notify(const_transaction, const_graph);
// Clear the request flag now that this optimization cycle is complete
optimization_request_ = false;
}
}

Expand All @@ -189,6 +211,48 @@ void BatchOptimizer::optimizerTimerCallback()
}
}

bool BatchOptimizer::resetServiceCallback(
const std::shared_ptr<std_srvs::srv::Empty::Request>,
std::shared_ptr<std_srvs::srv::Empty::Response>
)
{
// Tell all the plugins to stop
stopPlugins();
// Reset the optimizer state
{
std::lock_guard<std::mutex> lock(optimization_requested_mutex_);
optimization_request_ = false;
}
started_ = false;
// DANGER: The optimizationLoop() function obtains the lock optimization_mutex_ lock and the
// combined_transaction_mutex_ lock at the same time. We perform a parallel locking scheme
// here to prevent the possibility of deadlocks.
{
std::lock_guard<std::mutex> lock(optimization_mutex_);
// Clear the combined transation
{
std::lock_guard<std::mutex> lock(combined_transaction_mutex_);
combined_transaction_ = fuse_core::Transaction::make_shared();
}
// Clear the graph and marginal tracking states
graph_->clear();
}
// Clear all pending transactions
// The transaction callback and the optimization timer callback are the only other locations
// where the pending_transactions_ variable is modified. As long as the BatchOptimizer node
// handle is single-threaded, then pending_transactions_ variable cannot be modified while the
// reset callback is running. Therefore, there are no timing or sequence issues with exactly
// where inside the reset service callback the pending_transactions_ are cleared.
{
std::lock_guard<std::mutex> lock(pending_transactions_mutex_);
pending_transactions_.clear();
}
// Tell all the plugins to start
startPlugins();

return true;
}

void BatchOptimizer::transactionCallback(
const std::string & sensor_name,
fuse_core::Transaction::SharedPtr transaction)
Expand Down
7 changes: 7 additions & 0 deletions fuse_optimizers/test/launch_tests/test_fixed_lag_ignition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ TEST_F(FixedLagIgnitionFixture, SetInitialState)
pose_msg1.pose.covariance[35] = 1.0;
relative_pose_publisher->publish(pose_msg1);

/// @todo(swilliams) There is a timing or message queuing issue. This sleep "solves"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unrelated unit test was failing on my machine. It appears to be a timing or message queue issue. The Pose2D sensor is only receiving the second published messages with a timestamp of 3.0s. The previous message with a timestamp of 2.0s is never processed. As a quick fix, I just added a delay. But this seems like a problem that should be fixable by changing a message queue size somewhere.

/// the problem on my local machine, but this isn't a very robust
/// solution.
rclcpp::sleep_for(std::chrono::milliseconds(10));

auto pose_msg2 = geometry_msgs::msg::PoseWithCovarianceStamped();
pose_msg2.header.stamp = rclcpp::Time(3, 0, RCL_ROS_TIME);
pose_msg2.header.frame_id = "base_link";
Expand All @@ -146,6 +151,8 @@ TEST_F(FixedLagIgnitionFixture, SetInitialState)
pose_msg2.pose.covariance[35] = 1.0;
relative_pose_publisher->publish(pose_msg2);

rclcpp::sleep_for(std::chrono::milliseconds(10));

// Wait for the optimizer to process all queued transactions
rclcpp::Time result_timeout = node->now() + rclcpp::Duration::from_seconds(1.0);
auto odom_msg = nav_msgs::msg::Odometry();
Expand Down