From eecb607363096a867c47423f4bf6b11672d5324d Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Thu, 6 Jun 2024 17:44:33 -0700 Subject: [PATCH] Add tests for split event callbacks when using file and msg compression - Added "split_event_calls_callback_with_msg_compression" and "split_event_calls_callback_with_file_compression" uit tests Signed-off-by: Michael Orlov --- .../test_sequential_compression_writer.cpp | 161 ++++++++++++++++-- 1 file changed, 151 insertions(+), 10 deletions(-) diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index ba383e608..75fbf332a 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -63,7 +63,7 @@ class SequentialCompressionWriterTest : public TestWithParam tmp_dir_storage_options_{}, serialization_format_{"rmw_format"} { - tmp_dir_storage_options_.uri = tmp_dir_.string(); + tmp_dir_storage_options_.uri = (tmp_dir_ / bag_base_dir_).string(); fs::remove_all(tmp_dir_); ON_CALL(*storage_factory_, open_read_write(_)).WillByDefault(Return(storage_)); EXPECT_CALL(*storage_factory_, open_read_write(_)).Times(AtLeast(0)); @@ -92,7 +92,7 @@ class SequentialCompressionWriterTest : public TestWithParam DoAll( Invoke( [this](const rosbag2_storage::StorageOptions & storage_options) { - fake_storage_size_ = 0; + fake_storage_size_.store(0); fake_storage_uri_ = storage_options.uri; // Touch the file std::ofstream output(storage_options.uri); @@ -106,11 +106,11 @@ class SequentialCompressionWriterTest : public TestWithParam *storage_, write(An>())).WillByDefault( [this](std::shared_ptr) { - fake_storage_size_ += 1; + fake_storage_size_.fetch_add(1); }); ON_CALL(*storage_, get_bagfile_size).WillByDefault( [this]() { - return fake_storage_size_; + return fake_storage_size_.load(); }); ON_CALL(*storage_, get_relative_file_path).WillByDefault( [this]() { @@ -135,7 +135,6 @@ class SequentialCompressionWriterTest : public TestWithParam writer_ = std::make_unique(std::move(sequential_writer)); } - const std::string bag_name_ = "SequentialCompressionWriterTest"; std::unique_ptr> storage_factory_; std::shared_ptr> storage_; std::shared_ptr> converter_factory_; @@ -148,8 +147,10 @@ class SequentialCompressionWriterTest : public TestWithParam std::unique_ptr writer_; std::string serialization_format_; - uint64_t fake_storage_size_{}; + std::atomic fake_storage_size_{0}; // Need to be atomic for cache update since it + // uses in callback from cache_consumer thread std::string fake_storage_uri_; + const std::string bag_base_dir_ = "test_bag"; const uint64_t kDefaultCompressionQueueSize = 1; const uint64_t kDefaultCompressionQueueThreads = 4; @@ -299,11 +300,10 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), 3u); - const auto base_path = tmp_dir_storage_options_.uri; int counter = 0; for (const auto & path : intercepted_write_metadata_.relative_file_paths) { std::stringstream ss; - ss << bag_name_ << "_" << counter << "." << DefaultTestCompressor; + ss << bag_base_dir_ << "_" << counter << "." << DefaultTestCompressor; counter++; EXPECT_EQ(ss.str(), path); } @@ -426,7 +426,7 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz } writer_.reset(); // reset will call writer destructor - EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite); + EXPECT_EQ(fake_storage_size_.load(), kNumMessagesToWrite); } TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority) @@ -480,7 +480,148 @@ TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority) writer_.reset(); // reset will call writer destructor EXPECT_EQ(detected_thread_priority, *compression_options.thread_priority); - EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite); + EXPECT_EQ(fake_storage_size_.load(), kNumMessagesToWrite); +} + +TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_msg_compression) +{ + const uint64_t max_bagfile_size = 3; + const size_t num_splits = 2; + const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8 + + rosbag2_compression::CompressionOptions compression_options { + DefaultTestCompressor, + rosbag2_compression::CompressionMode::MESSAGE, + 0, + 1, + kDefaultCompressionQueueThreadsPriority + }; + + initializeFakeFileStorage(); + initializeWriter(compression_options); + + auto message = std::make_shared(); + message->topic_name = "test_topic"; + + tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size; + + std::vector closed_files; + std::vector opened_files; + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_files.emplace_back(info.closed_file); + opened_files.emplace_back(info.opened_file); + }; + writer_->add_event_callbacks(callbacks); + + writer_->open(tmp_dir_storage_options_); + writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""}); + + for (auto i = 0; i < message_count; ++i) { + writer_->write(message); + } + writer_->close(); + + EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), num_splits + 1); + + EXPECT_THAT(closed_files.size(), num_splits + 1); + EXPECT_THAT(opened_files.size(), num_splits + 1); + + if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) { + // Output debug info + for (size_t i = 0; i < opened_files.size(); i++) { + std::cout << "opened_file[" << i << "] = '" << opened_files[i] << + "'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl; + } + for (size_t i = 0; i < intercepted_write_metadata_.relative_file_paths.size(); i++) { + std::cout << "metadata file path [" << i << "] = '" << + intercepted_write_metadata_.relative_file_paths[i] << "'\n"; + } + } + + ASSERT_GE(opened_files.size(), num_splits + 1); + ASSERT_GE(closed_files.size(), num_splits + 1); + for (size_t i = 0; i < num_splits + 1; i++) { + auto expected_closed = + fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i)); + auto expected_opened = (i == num_splits) ? + // The last opened file shall be empty string when we do "writer->close();" + "" : fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1)); + EXPECT_EQ(closed_files[i], expected_closed.generic_string()) << "i = " << i; + EXPECT_EQ(opened_files[i], expected_opened.generic_string()) << "i = " << i; + } +} + +TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_compression) +{ + const uint64_t max_bagfile_size = 3; + const size_t num_splits = 2; + const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8 + + rosbag2_compression::CompressionOptions compression_options { + DefaultTestCompressor, + rosbag2_compression::CompressionMode::FILE, + 0, + 1, + kDefaultCompressionQueueThreadsPriority + }; + + initializeFakeFileStorage(); + initializeWriter(compression_options); + + auto message = std::make_shared(); + message->topic_name = "test_topic"; + + tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size; + + std::vector closed_files; + std::vector opened_files; + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_files.emplace_back(info.closed_file); + opened_files.emplace_back(info.opened_file); + }; + writer_->add_event_callbacks(callbacks); + + writer_->open(tmp_dir_storage_options_); + writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""}); + + for (auto i = 0; i < message_count; ++i) { + writer_->write(message); + } + writer_->close(); + + EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), num_splits + 1); + + EXPECT_THAT(closed_files.size(), num_splits + 1); + EXPECT_THAT(opened_files.size(), num_splits + 1); + + if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) { + // Output debug info + for (size_t i = 0; i < opened_files.size(); i++) { + std::cout << "opened_file[" << i << "] = '" << opened_files[i] << + "'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl; + } + for (size_t i = 0; i < intercepted_write_metadata_.relative_file_paths.size(); i++) { + std::cout << "metadata file path [" << i << "] = '" << + intercepted_write_metadata_.relative_file_paths[i] << "'\n"; + } + } + + ASSERT_GE(opened_files.size(), num_splits + 1); + ASSERT_GE(closed_files.size(), num_splits + 1); + for (size_t i = 0; i < num_splits + 1; i++) { + auto expected_closed = + fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i) + + "." + DefaultTestCompressor); + auto expected_opened = (i == num_splits) ? + // The last opened file shall be empty string when we do "writer->close();" + "" : fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1)); + EXPECT_EQ(closed_files[i], expected_closed.generic_string()) << "i = " << i; + EXPECT_EQ(opened_files[i], expected_opened.generic_string()) << "i = " << i; + } } INSTANTIATE_TEST_SUITE_P(