Skip to content

Commit

Permalink
Bugfix for bag_split event callbacks not called with file compression
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Orlov <[email protected]>
  • Loading branch information
MichaelOrlov committed May 9, 2024
1 parent 7d48046 commit 2baf20e
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ void SequentialCompressionWriter::compression_thread_fn()

while (true) {
std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message;
std::string file;
std::string closed_file_relative_to_bag;
{
std::unique_lock<std::mutex> lock(compressor_queue_mutex_);
// *INDENT-OFF*
Expand All @@ -134,7 +134,7 @@ void SequentialCompressionWriter::compression_thread_fn()
compressor_message_queue_.pop();
compressor_condition_.notify_all();
} else if (!compressor_file_queue_.empty()) {
file = compressor_file_queue_.front();
closed_file_relative_to_bag = compressor_file_queue_.front();
compressor_file_queue_.pop();
} else if (!compression_is_running_) {
// I woke up, all work queues are empty, and the main thread has stopped execution. Exit.
Expand All @@ -151,8 +151,27 @@ void SequentialCompressionWriter::compression_thread_fn()
std::lock_guard<std::recursive_mutex> storage_lock(storage_mutex_);
SequentialWriter::write(compressed_message);
}
} else if (!file.empty()) {
compress_file(*compressor, file);
} else if (!closed_file_relative_to_bag.empty()) {
compress_file(*compressor, closed_file_relative_to_bag);

// Execute callbacks from the base class
auto closed_file = (fs::path(base_folder_) / closed_file_relative_to_bag).generic_string();
std::string new_file;
// To determine, a new_file we can't rely on the metadata_.relative_file_paths.back(),
// because other compressor threads may have already pushed a new item above.
{
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
auto iter = std::find(
metadata_.relative_file_paths.begin(), metadata_.relative_file_paths.end(),
closed_file_relative_to_bag);
if (iter != metadata_.relative_file_paths.end()) {
++iter;
if (iter != metadata_.relative_file_paths.end()) {
new_file = (fs::path(base_folder_) / *iter).generic_string();
}
}
}
SequentialWriter::execute_bag_split_callbacks(closed_file, new_file);
}
}
}
Expand Down Expand Up @@ -349,14 +368,17 @@ void SequentialCompressionWriter::split_bagfile()
std::lock_guard<std::mutex> compressor_lock(compressor_queue_mutex_);

// Grab last file before calling common splitting logic, which pushes the new filename
const auto last_file = metadata_.relative_file_paths.back();
SequentialWriter::split_bagfile();
const auto last_file_relative_to_bag = metadata_.relative_file_paths.back();
const auto new_file = SequentialWriter::split_bagfile_local(false);

// If we're in FILE compression mode, push this file's name on to the queue so another
// thread will handle compressing it. If not, we can just carry on.
if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE) {
compressor_file_queue_.push(last_file);
compressor_file_queue_.push(last_file_relative_to_bag);
compressor_condition_.notify_one();
} else {
auto last_file = (fs::path(base_folder_) / last_file_relative_to_bag).generic_string();
SequentialWriter::execute_bag_split_callbacks(last_file, new_file);
}

if (!storage_) {
Expand Down Expand Up @@ -387,6 +409,7 @@ void SequentialCompressionWriter::write(
// If the compression mode is MESSAGE, push the message into a queue that will be handled
// by the compression threads.
if (compression_options_.compression_mode == CompressionMode::FILE) {
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
SequentialWriter::write(message);
} else {
std::unique_lock<std::mutex> lock(compressor_queue_mutex_);
Expand Down
5 changes: 5 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
std::shared_ptr<rosbag2_cpp::cache::MessageCacheInterface> message_cache_;
std::unique_ptr<rosbag2_cpp::cache::CacheConsumer> cache_consumer_;

std::string split_bagfile_local(bool execute_callbacks = true);

void execute_bag_split_callbacks(
const std::string & closed_file, const std::string & opened_file);

void switch_to_next_storage();

std::string format_storage_uri(
Expand Down
29 changes: 22 additions & 7 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,11 @@ void SequentialWriter::close()
// bag file was closed before callback call.
}
if (!metadata_.relative_file_paths.empty()) {
auto info = std::make_shared<bag_events::BagSplitInfo>();
// Take the latest file name from metadata in case if it was updated after compression in
// derived class
info->closed_file =
auto closed_file =
(fs::path(base_folder_) / metadata_.relative_file_paths.back()).generic_string();
callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info);
execute_bag_split_callbacks(closed_file, "");
}

topics_names_to_info_.clear();
Expand Down Expand Up @@ -346,12 +345,11 @@ void SequentialWriter::switch_to_next_storage()
}
}

void SequentialWriter::split_bagfile()
std::string SequentialWriter::split_bagfile_local(bool execute_callbacks)
{
auto info = std::make_shared<bag_events::BagSplitInfo>();
info->closed_file = storage_->get_relative_file_path();
auto closed_file = storage_->get_relative_file_path();
switch_to_next_storage();
info->opened_file = storage_->get_relative_file_path();
auto opened_file = storage_->get_relative_file_path();

metadata_.relative_file_paths.push_back(strip_parent_path(storage_->get_relative_file_path()));

Expand All @@ -361,9 +359,26 @@ void SequentialWriter::split_bagfile()
file_info.path = strip_parent_path(storage_->get_relative_file_path());
metadata_.files.push_back(file_info);

if (execute_callbacks) {
execute_bag_split_callbacks(closed_file, opened_file);
}
return opened_file;
}

void SequentialWriter::execute_bag_split_callbacks(
const std::string & closed_file, const std::string & opened_file)
{
auto info = std::make_shared<bag_events::BagSplitInfo>();
info->closed_file = closed_file;
info->opened_file = opened_file;
callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info);
}

void SequentialWriter::split_bagfile()
{
(void)split_bagfile_local();
}

void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
{
if (!is_open_) {
Expand Down

0 comments on commit 2baf20e

Please sign in to comment.