Skip to content

Commit

Permalink
[humble] Bugfix for bag_split event callbacks called to early with fi…
Browse files Browse the repository at this point in the history
…le compression (backport #1643) (#1733)

* Bugfix for bag_split event callbacks called to early with file compression (#1643)

* Bugfix for bag_split event callbacks not called with file compression

Signed-off-by: Michael Orlov <[email protected]>

* Delete redundant "should_split_bagfile" in compression_writer

- It is a non-virtual method and doesn't call from the base class.

Signed-off-by: Michael Orlov <[email protected]>

* Adjust "split_event_calls_callback" for testing multiple splits

Signed-off-by: Michael Orlov <[email protected]>

* Use temp folder for "SequentialWriterTest" fixture instead of "uri"

Signed-off-by: Michael Orlov <[email protected]>

* Add tests for split event callbacks when using file and msg compression

 - Added "split_event_calls_callback_with_msg_compression" and
"split_event_calls_callback_with_file_compression" uit tests

Signed-off-by: Michael Orlov <[email protected]>

* Add debug info to the flaky "can_record_again_after_stop" test

Signed-off-by: Michael Orlov <[email protected]>

* Use `uint64_t` type for `fake_storage_size_` in tests

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Michael Orlov <[email protected]>
(cherry picked from commit 1877b53)

# Conflicts:
#	rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp
#	rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp
#	rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
#	rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp
#	rosbag2_transport/test/rosbag2_transport/test_record.cpp

* Address merge conflicts

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Michael Orlov <[email protected]>
Co-authored-by: Michael Orlov <[email protected]>
  • Loading branch information
mergify[bot] and MichaelOrlov authored Jul 16, 2024
1 parent 70434fc commit f449d50
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,6 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
// Closes the current backed storage and opens the next bagfile.
void split_bagfile() override;

// Checks if the current recording bagfile needs to be split and rolled over to a new file.
bool should_split_bagfile(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time);

// Prepares the metadata by setting initial values.
void init_metadata() override;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

#include "logging.hpp"

namespace fs = rcpputils::fs;

namespace rosbag2_compression
{

Expand Down Expand Up @@ -76,7 +78,7 @@ void SequentialCompressionWriter::compression_thread_fn()

while (true) {
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message;
std::string file;
std::string closed_file_relative_to_bag;
{
std::unique_lock<std::mutex> lock(compressor_queue_mutex_);
compressor_condition_.wait(
Expand All @@ -92,7 +94,7 @@ void SequentialCompressionWriter::compression_thread_fn()
compressor_message_queue_.pop();
compressor_condition_.notify_all();
} else if (!compressor_file_queue_.empty()) {
file = compressor_file_queue_.front();
closed_file_relative_to_bag = compressor_file_queue_.front();
compressor_file_queue_.pop();
} else if (!compression_is_running_) {
// I woke up, all work queues are empty, and the main thread has stopped execution. Exit.
Expand All @@ -109,8 +111,35 @@ void SequentialCompressionWriter::compression_thread_fn()
std::lock_guard<std::recursive_mutex> storage_lock(storage_mutex_);
SequentialWriter::write(message);
}
} else if (!file.empty()) {
compress_file(*compressor, file);
} else if (!closed_file_relative_to_bag.empty()) {
compress_file(*compressor, closed_file_relative_to_bag);

// Execute callbacks from the base class
static const std::string compressor_ext = "." + compressor->get_compression_identifier();
auto closed_file =
(fs::path(base_folder_) / (closed_file_relative_to_bag + compressor_ext)).string();
std::string new_file;
// To determine, a new_file we can't rely on the metadata_.relative_file_paths.back(),
// because other compressor threads may have already pushed a new item above.
{
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
auto iter = std::find(
metadata_.relative_file_paths.begin(), metadata_.relative_file_paths.end(),
closed_file_relative_to_bag + compressor_ext);
if (iter != metadata_.relative_file_paths.end()) {
++iter;
if (iter != metadata_.relative_file_paths.end()) {
new_file = (fs::path(base_folder_) / *iter).string();
}
}
}
if (!new_file.empty()) {
// The new_file is empty when we compressed the last file after calling close().
// Note: We shall not call 'execute_bag_split_callbacks(closed_file, new_file)' for the
// last compressed file because it will be called inside base class
// SequentialWriter::close().
SequentialWriter::execute_bag_split_callbacks(closed_file, new_file);
}
}
}
}
Expand Down Expand Up @@ -292,14 +321,17 @@ void SequentialCompressionWriter::split_bagfile()
std::lock_guard<std::mutex> compressor_lock(compressor_queue_mutex_);

// Grab last file before calling common splitting logic, which pushes the new filename
const auto last_file = metadata_.relative_file_paths.back();
SequentialWriter::split_bagfile();
const auto last_file_relative_to_bag = metadata_.relative_file_paths.back();
const auto new_file = SequentialWriter::split_bagfile_local(false);

// If we're in FILE compression mode, push this file's name on to the queue so another
// thread will handle compressing it. If not, we can just carry on.
if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE) {
compressor_file_queue_.push(last_file);
compressor_file_queue_.push(last_file_relative_to_bag);
compressor_condition_.notify_one();
} else {
auto last_file = (fs::path(base_folder_) / last_file_relative_to_bag).string();
SequentialWriter::execute_bag_split_callbacks(last_file, new_file);
}

if (!storage_) {
Expand All @@ -324,6 +356,7 @@ void SequentialCompressionWriter::write(
// If the compression mode is MESSAGE, push the message into a queue that will be handled
// by the compression threads.
if (compression_options_.compression_mode == CompressionMode::FILE) {
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
SequentialWriter::write(message);
} else {
// since the compression operation will manipulate memory inplace, thus
Expand Down Expand Up @@ -389,17 +422,4 @@ void SequentialCompressionWriter::write(
}
}

bool SequentialCompressionWriter::should_split_bagfile(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time)
{
if (storage_options_.max_bagfile_size ==
rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT)
{
return false;
} else {
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
return SequentialWriter::should_split_bagfile(current_time);
}
}

} // namespace rosbag2_compression
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

using namespace testing; // NOLINT

namespace fs = rcpputils::fs;

static constexpr const char * DefaultTestCompressor = "fake_comp";

class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
Expand All @@ -54,7 +56,7 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
tmp_dir_storage_options_{},
serialization_format_{"rmw_format"}
{
tmp_dir_storage_options_.uri = tmp_dir_.string();
tmp_dir_storage_options_.uri = (tmp_dir_ / bag_base_dir_).string();
rcpputils::fs::remove_all(tmp_dir_);
ON_CALL(*storage_factory_, open_read_write(_)).WillByDefault(Return(storage_));
EXPECT_CALL(*storage_factory_, open_read_write(_)).Times(AtLeast(0));
Expand All @@ -77,8 +79,8 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
auto msg_length = msg_content.length();
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";
message->serialized_data = rosbag2_storage::make_serialized_message(
msg_content.c_str(), msg_length);
message->serialized_data =
rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_length);
return message;
}

Expand All @@ -90,7 +92,7 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
DoAll(
Invoke(
[this](const rosbag2_storage::StorageOptions & storage_options) {
fake_storage_size_ = 0;
fake_storage_size_.store(0);
fake_storage_uri_ = storage_options.uri;
// Touch the file
std::ofstream output(storage_options.uri);
Expand All @@ -104,11 +106,11 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).WillByDefault(
[this](std::shared_ptr<const rosbag2_storage::SerializedBagMessage>) {
fake_storage_size_ += 1;
fake_storage_size_.fetch_add(1);
});
ON_CALL(*storage_, get_bagfile_size).WillByDefault(
[this]() {
return fake_storage_size_;
return fake_storage_size_.load();
});
ON_CALL(*storage_, get_relative_file_path).WillByDefault(
[this]() {
Expand All @@ -133,7 +135,6 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));
}

const std::string bag_name_ = "SequentialCompressionWriterTest";
std::unique_ptr<StrictMock<MockStorageFactory>> storage_factory_;
std::shared_ptr<NiceMock<MockStorage>> storage_;
std::shared_ptr<StrictMock<MockConverterFactory>> converter_factory_;
Expand All @@ -143,8 +144,10 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
rosbag2_storage::BagMetadata intercepted_metadata_;
std::unique_ptr<rosbag2_cpp::Writer> writer_;
std::string serialization_format_;
uint64_t fake_storage_size_{};
std::atomic<uint64_t> fake_storage_size_{0}; // Need to be atomic for cache update since it
// uses in callback from cache_consumer thread
std::string fake_storage_uri_;
const std::string bag_base_dir_ = "test_bag";

const uint64_t kDefaultCompressionQueueSize = 1;
const uint64_t kDefaultCompressionQueueThreads = 4;
Expand Down Expand Up @@ -297,11 +300,10 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative
EXPECT_EQ(
intercepted_metadata_.relative_file_paths.size(), 3u);

const auto base_path = tmp_dir_storage_options_.uri;
int counter = 0;
for (const auto & path : intercepted_metadata_.relative_file_paths) {
std::stringstream ss;
ss << bag_name_ << "_" << counter << "." << DefaultTestCompressor;
ss << bag_base_dir_ << "_" << counter << "." << DefaultTestCompressor;
counter++;
EXPECT_EQ(ss.str(), path);
}
Expand Down Expand Up @@ -339,6 +341,147 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz
EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
}

TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_msg_compression)
{
const uint64_t max_bagfile_size = 3;
const size_t num_splits = 2;
const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
0,
1
};

initializeFakeFileStorage();
initializeWriter(compression_options);

auto message = make_test_msg();
message->topic_name = "test_topic";

tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size;

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", {}});

for (auto i = 0; i < message_count; ++i) {
writer_->write(message);
}
writer_->close();

EXPECT_EQ(intercepted_metadata_.relative_file_paths.size(), num_splits + 1);

EXPECT_THAT(closed_files.size(), num_splits + 1);
EXPECT_THAT(opened_files.size(), num_splits + 1);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
for (size_t i = 0; i < intercepted_metadata_.relative_file_paths.size(); i++) {
std::cout << "metadata file path [" << i << "] = '" <<
intercepted_metadata_.relative_file_paths[i] << "'\n";
}
}

ASSERT_GE(opened_files.size(), num_splits + 1);
ASSERT_GE(closed_files.size(), num_splits + 1);
for (size_t i = 0; i < num_splits + 1; i++) {
auto expected_closed =
fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i));
auto expected_opened = (i == num_splits) ?
// The last opened file shall be empty string when we do "writer->close();"
fs::path("") :
fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1));
EXPECT_EQ(closed_files[i], expected_closed.string()) << "i = " << i;
EXPECT_EQ(opened_files[i], expected_opened.string()) << "i = " << i;
}
}

TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_compression)
{
const uint64_t max_bagfile_size = 3;
const size_t num_splits = 2;
const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::FILE,
0,
1
};

initializeFakeFileStorage();
initializeWriter(compression_options);

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";

tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size;

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", {}});

for (auto i = 0; i < message_count; ++i) {
writer_->write(message);
}
writer_->close();

EXPECT_EQ(intercepted_metadata_.relative_file_paths.size(), num_splits + 1);

EXPECT_THAT(closed_files.size(), num_splits + 1);
EXPECT_THAT(opened_files.size(), num_splits + 1);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
for (size_t i = 0; i < intercepted_metadata_.relative_file_paths.size(); i++) {
std::cout << "metadata file path [" << i << "] = '" <<
intercepted_metadata_.relative_file_paths[i] << "'\n";
}
}

ASSERT_GE(opened_files.size(), num_splits + 1);
ASSERT_GE(closed_files.size(), num_splits + 1);
for (size_t i = 0; i < num_splits + 1; i++) {
auto expected_closed =
fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i) +
"." + DefaultTestCompressor);
auto expected_opened = (i == num_splits) ?
// The last opened file shall be empty string when we do "writer->close();"
fs::path("") :
fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1));
EXPECT_EQ(closed_files[i], expected_closed.string()) << "i = " << i;
EXPECT_EQ(opened_files[i], expected_opened.string()) << "i = " << i;
}
}

INSTANTIATE_TEST_SUITE_P(
SequentialCompressionWriterTestQueueSizes,
SequentialCompressionWriterTest,
Expand Down
5 changes: 5 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
std::shared_ptr<rosbag2_cpp::cache::MessageCacheInterface> message_cache_;
std::unique_ptr<rosbag2_cpp::cache::CacheConsumer> cache_consumer_;

std::string split_bagfile_local(bool execute_callbacks = true);

void execute_bag_split_callbacks(
const std::string & closed_file, const std::string & opened_file);

void switch_to_next_storage();

std::string format_storage_uri(
Expand Down
Loading

0 comments on commit f449d50

Please sign in to comment.