Skip to content

Commit

Permalink
Fix for possible freeze in Recorder::stop() (#1381) (#1387)
Browse files Browse the repository at this point in the history
* Fix for possible freeze in Recorder::stop()

- It was possible a freeze in recorder due to the race condition when
calling Recorder::stop() while event publisher thread hasn't been fully
started yet.

Signed-off-by: Michael Orlov <[email protected]>

* Move event_publisher_thread_wake_cv_.notify_all() out of the mutex lock

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Michael Orlov <[email protected]>
(cherry picked from commit c6cc69a)

Co-authored-by: Michael Orlov <[email protected]>
  • Loading branch information
mergify[bot] and MichaelOrlov authored Jun 13, 2023
1 parent 33edccf commit 019a037
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class RecorderImpl
std::shared_ptr<rosbag2_cpp::Writer> writer_;
rosbag2_storage::StorageOptions storage_options_;
rosbag2_transport::RecordOptions record_options_;
std::atomic<bool> stop_discovery_;
std::atomic<bool> stop_discovery_ = false;
std::unordered_map<std::string, std::shared_ptr<rclcpp::SubscriptionBase>> subscriptions_;

private:
Expand Down Expand Up @@ -132,14 +132,15 @@ class RecorderImpl
rclcpp::Service<rosbag2_interfaces::srv::SplitBagfile>::SharedPtr srv_split_bagfile_;

std::atomic<bool> paused_ = false;
std::atomic<bool> in_recording_ = false;
std::shared_ptr<KeyboardHandler> keyboard_handler_;
KeyboardHandler::callback_handle_t toggle_paused_key_callback_handle_ =
KeyboardHandler::invalid_handle;

// Variables for event publishing
rclcpp::Publisher<rosbag2_interfaces::msg::WriteSplitEvent>::SharedPtr split_event_pub_;
bool event_publisher_thread_should_exit_ = false;
bool write_split_has_occurred_ = false;
std::atomic<bool> event_publisher_thread_should_exit_ = false;
std::atomic<bool> write_split_has_occurred_ = false;
rosbag2_cpp::bag_events::BagSplitInfo bag_split_info_;
std::mutex event_publisher_thread_mutex_;
std::condition_variable event_publisher_thread_wake_cv_;
Expand Down Expand Up @@ -191,12 +192,17 @@ RecorderImpl::~RecorderImpl()
stop();
}


void RecorderImpl::stop()
{
stop_discovery_ = true;
if (discovery_future_.valid()) {
discovery_future_.wait();
auto status = discovery_future_.wait_for(2 * record_options_.topic_polling_interval);
if (status != std::future_status::ready) {
RCLCPP_ERROR_STREAM(
node->get_logger(),
"discovery_future_.wait_for(" << record_options_.topic_polling_interval.count() <<
") return status: " << (status == std::future_status::timeout ? "timeout" : "deferred"));
}
}
paused_ = true;
subscriptions_.clear();
Expand All @@ -210,10 +216,18 @@ void RecorderImpl::stop()
if (event_publisher_thread_.joinable()) {
event_publisher_thread_.join();
}
in_recording_ = false;
RCLCPP_INFO(node->get_logger(), "Recording stopped");
}

void RecorderImpl::record()
{
if (in_recording_.exchange(true)) {
RCLCPP_WARN_STREAM(
node->get_logger(),
"Called Recorder::record() while already in recording, dismissing request.");
return;
}
paused_ = record_options_.start_paused;
stop_discovery_ = record_options_.is_discovery_disabled;
topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides;
Expand Down Expand Up @@ -281,9 +295,8 @@ void RecorderImpl::record()
// Start the thread that will publish events
event_publisher_thread_ = std::thread(&RecorderImpl::event_publisher_thread_main, this);

split_event_pub_ = node->create_publisher<rosbag2_interfaces::msg::WriteSplitEvent>(
"events/write_split",
1);
split_event_pub_ =
node->create_publisher<rosbag2_interfaces::msg::WriteSplitEvent>("events/write_split", 1);
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[this](rosbag2_cpp::bag_events::BagSplitInfo & info) {
Expand All @@ -305,15 +318,13 @@ void RecorderImpl::record()
discovery_future_ =
std::async(std::launch::async, std::bind(&RecorderImpl::topics_discovery, this));
}
RCLCPP_INFO(node->get_logger(), "Recording...");
}

void RecorderImpl::event_publisher_thread_main()
{
RCLCPP_INFO(node->get_logger(), "Event publisher thread: Starting");

bool should_exit = false;

while (!should_exit) {
while (!event_publisher_thread_should_exit_.load()) {
std::unique_lock<std::mutex> lock(event_publisher_thread_mutex_);
event_publisher_thread_wake_cv_.wait(
lock,
Expand All @@ -327,10 +338,7 @@ void RecorderImpl::event_publisher_thread_main()
message.opened_file = bag_split_info_.opened_file;
split_event_pub_->publish(message);
}

should_exit = event_publisher_thread_should_exit_;
}

RCLCPP_INFO(node->get_logger(), "Event publisher thread: Exiting");
}

Expand Down

0 comments on commit 019a037

Please sign in to comment.